博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java并发编程之:Queue
阅读量:5787 次
发布时间:2019-06-18

本文共 13807 字,大约阅读时间需要 46 分钟。

hot3.png

Queue基础

java.util 接口 Queue<E>

  抛出异常 返回特殊值
插入 add(e) offer(e)
移除 remove() poll()
检查 element() peek()

1.add

boolean add(E e)将指定的元素插入此队列(如果立即可行且不会违反容量限制),在成功时返回 true,如果当前没有可用的空间,则抛出 IllegalStateException。
指定者:
接口 Collection<E> 中的 add
参数:
e - 要添加的元素
返回:
true(根据 Collection.add(E) 的规定)
抛出:
IllegalStateException - 如果由于容量的限制此时不能添加该元素
ClassCastException - 如果指定元素的类不允许将其添加到此队列
NullPointerException - 如果指定元素为 null 并且此队列不允许 null 元素
IllegalArgumentException - 如果此元素的某些属性不允许将其添加到此队列

2.offer

boolean offer(E e)将指定的元素插入此队列(如果立即可行且不会违反容量限制),当使用有容量限制的队列时,此方法通常要优于 add(E),后者可能无法插入元素,而只是抛出一个异常。
参数:
e - 要添加的元素
返回:
如果该元素已添加到此队列,则返回 true;否则返回 false
抛出:
ClassCastException - 如果指定元素的类不允许将其添加到此队列
NullPointerException - 如果指定元素为 null 并且此队列不允许 null 元素
IllegalArgumentException - 如果此元素的某些属性不允许将其添加到此队列

3.remove

E remove()获取并移除此队列的头。此方法与 poll 唯一的不同在于:此队列为空时将抛出一个异常。
返回:
队列的头
抛出:
NoSuchElementException - 如果此队列为空

4.poll

E poll()获取并移除此队列的头,如果此队列为空,则返回 null。
返回:
队列的头,如果此队列为空,则返回 null

5.element

E element()获取,但是不移除此队列的头。此方法与 peek 唯一的不同在于:此队列为空时将抛出一个异常。
返回:
队列的头
抛出:
NoSuchElementException - 如果此队列为空

6.peek

E peek()获取但不移除此队列的头;如果此队列为空,则返回 null。
返回:
此队列的头;如果此队列为空,则返回 null

java.util.concurrent 接口 BlockingQueue<E>

BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。下表中总结了这些方法:

   抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek()    

 

 

 

 

BlockingQueue 不接受 null 元素。试图 add、put 或 offer 一个 null 元素时,某些实现会抛出 NullPointerException。null 被用作指示 poll 操作失败的警戒值。

BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个 remainingCapacity,超出此容量,便无法无阻塞地 put 附加元素。没有任何内部容量约束的 BlockingQueue 总是报告 Integer.MAX_VALUE 的剩余容量。
BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持 Collection 接口。因此,举例来说,使用 remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操作通常不 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。
BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)没有 必要自动执行,除非在实现中特别说明。因此,举例来说,在只添加了 c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。
BlockingQueue 实质上不 支持使用任何一种“close”或“shutdown”操作来指示不再添加任何项。这种功能的需求和使用有依赖于实现的倾向。例如,一种常用的策略是:对于生产者,插入特殊的 end-of-stream 或 poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。
以下是基于典型的生产者-使用者场景的一个用例。注意,BlockingQueue 可以安全地与多个生产者和多个使用者一起使用。

class Producer implements Runnable {   private final BlockingQueue queue;   Producer(BlockingQueue q) { queue = q; }   public void run() {     try {       while(true) { queue.put(produce()); }     } catch (InterruptedException ex) { ... handle ...}   }   Object produce() { ... } } class Consumer implements Runnable {   private final BlockingQueue queue;   Consumer(BlockingQueue q) { queue = q; }   public void run() {     try {       while(true) { consume(queue.take()); }     } catch (InterruptedException ex) { ... handle ...}   }   void consume(Object x) { ... } } class Setup {   void main() {     BlockingQueue q = new SomeQueueImplementation();     Producer p = new Producer(q);     Consumer c1 = new Consumer(q);     Consumer c2 = new Consumer(q);     new Thread(p).start();     new Thread(c1).start();     new Thread(c2).start();   } }

内存一致性效果:当存在其他并发 collection 时,将对象放入 BlockingQueue 之前的线程中的操作 happen-before 随后通过另一线程从 BlockingQueue 中访问或移除该元素的操作。

put

void put(E e) throws InterruptedException将指定元素插入此队列中,将等待可用的空间(如果有必要)。
参数:
e - 要添加的元素
抛出:
InterruptedException - 如果在等待时被中断
ClassCastException - 如果指定元素的类不允许将其添加到此队列
NullPointerException - 如果指定元素为 null
IllegalArgumentException - 如果指定元素的某些属性不允许将其添加到此队列

offer

boolean offer(E e,
              long timeout,
              TimeUnit unit)
              throws InterruptedException将指定元素插入此队列中,在到达指定的等待时间前等待可用的空间(如果有必要)。
参数:
e - 要添加的元素
timeout - 放弃之前等待的时间长度,以 unit 为时间单位
unit - 确定如何解释 timeout 参数的 TimeUnit
返回:
如果成功,则返回 true;如果在空间可用前超过了指定的等待时间,则返回 false
抛出:
InterruptedException - 如果在等待时被中断
ClassCastException - 如果指定元素的类不允许将其添加到此队列
NullPointerException - 如果指定元素为 null
IllegalArgumentException - 如果指定元素的某些属性不允许将其添加到此队列

take

E take()
       throws InterruptedException获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
返回:
此队列的头部
抛出:
InterruptedException - 如果在等待时被中断

poll

E poll(long timeout,
       TimeUnit unit)
       throws InterruptedException获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
参数:
timeout - 放弃之前要等待的时间长度,用 unit 的时间单位表示
unit - 确定如何解释 timeout 参数的 TimeUnit
返回:
此队列的头部;如果在元素可用前超过了指定的等待时间,则返回 null
抛出:
InterruptedException - 如果在等待时被中断

 

Queue结构图

112914_5IwV_3101476.png

113235_Xbes_3101476.png

//高性能无阻塞无界队列:ConcurrentLinkedQueueConcurrentLinkedQueue
q = new ConcurrentLinkedQueue
();q.offer("a");q.offer("b");q.offer("c");q.offer("d");q.add("e");System.out.println(q.poll()); //a 从头部取出元素,并从队列里删除System.out.println(q.size()); //4System.out.println(q.peek()); //bSystem.out.println(q.size()); //4

113714_cK07_3101476.png

 

在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列(先进先出)。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。

注:什么叫线程安全?这个首先要明确。线程安全就是说多线程访问同一代码,不会产生不确定的结果。

并行和并发区别

1、并行是指两者同时执行一件事,比如赛跑,两个人都在不停的往前跑;

2、并发是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A ,交替使用,目的是提高效率

LinkedBlockingQueue
 

由于LinkedBlockingQueue实现是线程安全的,实现了先进先出等特性,是作为生产者消费者的首选,LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。

import java.util.concurrent.BlockingQueue;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.Executors;  import java.util.concurrent.LinkedBlockingQueue;    /**  * 多线程模拟实现生产者/消费者模型  *    * @author 林计钦  * @version 1.0 2013-7-25 下午05:23:11  */  public class BlockingQueueTest2 {      /**      *       * 定义装苹果的篮子      *       */      public class Basket {          // 篮子,能够容纳3个苹果          BlockingQueue
basket = new LinkedBlockingQueue
(3); // 生产苹果,放入篮子 public void produce() throws InterruptedException { // put方法放入一个苹果,若basket满了,等到basket有位置 basket.put("An apple"); } // 消费苹果,从篮子中取走 public String consume() throws InterruptedException { // take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部) return basket.take(); } } // 定义苹果生产者 class Producer implements Runnable { private String instance; private Basket basket; public Producer(String instance, Basket basket) { this.instance = instance; this.basket = basket; } public void run() { try { while (true) { // 生产苹果 System.out.println("生产者准备生产苹果:" + instance); basket.produce(); System.out.println("!生产者生产苹果完毕:" + instance); // 休眠300ms Thread.sleep(300); } } catch (InterruptedException ex) { System.out.println("Producer Interrupted"); } } } // 定义苹果消费者 class Consumer implements Runnable { private String instance; private Basket basket; public Consumer(String instance, Basket basket) { this.instance = instance; this.basket = basket; } public void run() { try { while (true) { // 消费苹果 System.out.println("消费者准备消费苹果:" + instance); System.out.println(basket.consume()); System.out.println("!消费者消费苹果完毕:" + instance); // 休眠1000ms Thread.sleep(1000); } } catch (InterruptedException ex) { System.out.println("Consumer Interrupted"); } } } public static void main(String[] args) { BlockingQueueTest2 test = new BlockingQueueTest2(); // 建立一个装苹果的篮子 Basket basket = test.new Basket(); ExecutorService service = Executors.newCachedThreadPool(); Producer producer = test.new Producer("生产者001", basket); Producer producer2 = test.new Producer("生产者002", basket); Consumer consumer = test.new Consumer("消费者001", basket); service.submit(producer); service.submit(producer2); service.submit(consumer); // 程序运行5s后,所有任务停止 // try { // Thread.sleep(1000 * 5); // } catch (InterruptedException e) { // e.printStackTrace(); // } // service.shutdownNow(); } }

ConcurrentLinkedQueue是Queue的一个安全实现.Queue中元素按FIFO原则进行排序.采用CAS操作,来保证元素的一致性。

LinkedBlockingQueue是一个线程安全的阻塞队列,它实现了BlockingQueue接口,BlockingQueue接口继承自java.util.Queue接口,并在这个接口的基础上增加了take和put方法,这两个方法正是队列操作的阻塞版本。

 

ConcurrentLinkedQueue

import java.util.concurrent.ConcurrentLinkedQueue;  import java.util.concurrent.CountDownLatch;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.Executors;    public class ConcurrentLinkedQueueTest {      private static ConcurrentLinkedQueue
queue = new ConcurrentLinkedQueue
(); private static int count = 2; // 线程个数 //CountDownLatch,一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。 private static CountDownLatch latch = new CountDownLatch(count); public static void main(String[] args) throws InterruptedException { long timeStart = System.currentTimeMillis(); ExecutorService es = Executors.newFixedThreadPool(4); ConcurrentLinkedQueueTest.offer(); for (int i = 0; i < count; i++) { es.submit(new Poll()); } latch.await(); //使得主线程(main)阻塞直到latch.countDown()为零才继续执行 System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms"); es.shutdown(); } /** * 生产 */ public static void offer() { for (int i = 0; i < 100000; i++) { queue.offer(i); } } /** * 消费 * * @author 林计钦 * @version 1.0 2013-7-25 下午05:32:56 */ static class Poll implements Runnable { public void run() { // while (queue.size()>0) { while (!queue.isEmpty()) { System.out.println(queue.poll()); } latch.countDown(); } } }

它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最先加入的,尾是最近加入的。插入元素是追加到尾上。提取一个元素是从头提取。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。该队列不允许null元素。此实现采用了有效的“无等待 (wait-free)”,该算法基于 Maged M. Michael 和 Michael L. Scott 合著的  Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms 中描述的算法。

      注意和大多数集合不一样。得到元素个数所花的时间是不确定。由于该队列的异步特性,确定当前的元素数量需要遍历的元素。这个类和它的迭代器实现了Collection和Iterator接口的所有可选方法。

内存一致性效果:和其他并发集合一样。把一个元素放入到队列的线程的优先级高与对元素的访问和移除的线程

有三个函数需要注意的:

peek()获取元素 不移除头结点

poll() 获取元素并且在队列中移除,如果队列为空返回null

toArray

返回以恰当顺序包含此队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。如果指定的数组能容纳队列,则将该队列返回此处。否则,将分配一个具有指定数组的运行时类型和此队列大小的新数组。

如果指定的数组能容纳队列,并有剩余的空间(即数组的元素比队列多),那么会将紧接队列尾部的元素设置为 null。
像 toArray() 方法一样,此方法充当基于数组的 API 与基于 collection 的 API 之间的桥梁。更进一步说,此方法允许对输出数组的运行时类型进行精确控制,在某些情况下,这可以用来节省分配开销。

假定 x 是只包含字符串的一个已知队列。以下代码用来将该队列转储到一个新分配的 String 数组.

size:

返回此队列中的元素数量。如果此队列包含的元素数大于 Integer.MAX_VALUE,则返回 Integer.MAX_VALUE。

需要小心的是,与大多数 collection 不同,此方法不是 一个固定时间操作。由于这些队列的异步特性,确定当前的元素数需要进行一次花费 O(n) 时间的遍历

注意1:

ConcurrentLinkedQueue的.size() 是要遍历一遍集合的,很慢的,所以尽量要避免用size,
如果判断队列是否为空最好用isEmpty()而不是用size来判断.

注意问题1:

使用了这个ConcurrentLinkedQueue 类之后是否意味着我们不需要自己进行任何同步或加锁操作了呢?

如果直接使用它提供的函数,比如:queue.add(obj); 或者 queue.poll(obj);,这样我们自己不需要做任何同步。

但如果是非原子操作,比如:

if(!queue.isEmpty()) {       queue.poll(obj);    }

我们很难保证,在调用了isEmpty()之后,poll()之前,这个queue没有被其他线程修改。

所以对于这种情况,我们还是需要自己同步:(http://stackoverflow.com/questions/435069/java-util-concurrentlinkedqueue/435941 )

synchronized(queue) {       if(!queue.isEmpty()) {       queue.poll(obj);      }     }

注意问题2:

public class conn{        public static void main(String[] args) throws Exception{            Queue
queue=new ConcurrentLinkedQueue
(); for(int i=0;i<1000000;i++){ queue.add(String.valueOf(i)); } int num=10;//线程人个数 for(int i=0;i
public class ThreadConn implements Runnable{         Queue
queue; public ThreadConn(Queue
queue){ this.queue=queue; Thread thread=new Thread(this); thread.start(); } public void run(){ try{ long sd=new Date().getTime(); while(queue.poll()!=null){ //这里是业务逻辑 } System.out.println (sn-sd); }catch(Exception e){ e.printStackTrace(); } } }

测试结果如下:

启动10个线程

31
0
0
0
0
500
390
297
0
0
启动1个线程 360

10还没有1个快,如果poll这后的业务逻辑运行时间小的话,多线程序没有任何意义, 反之如果poll这后的业务逻辑运行时间相当于Thread.sleep(1);多线程确实起作用! 

ConcurrentLinkedQueue 是基于链接节点的、线程安全的队列。并发访问不需要同步。因为它在队列的尾部添加元素并从头部删除它们,所以只要不需要知道队列的大小, ConcurrentLinkedQueue 对公共集合的共享访问就可以工作得很好。收集关于队列大小的信息会很慢,需要遍历队列。

/*    * 一个基于链接节点的、无界的、线程安全的队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部    * 是队列中时间最短的元素。新的元素插入到队列的尾部,队列检索操作从队列头部获得元素。当许多线程共享访问一个公共 collection    * 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许 null 元素。    */      private void concurrentLinkedQueueTest() {          ConcurrentLinkedQueue
concurrentLinkedQueue = new ConcurrentLinkedQueue
(); concurrentLinkedQueue.add("a"); concurrentLinkedQueue.add("b"); concurrentLinkedQueue.add("c"); concurrentLinkedQueue.offer("d"); // 将指定元素插入到此队列的尾部。 concurrentLinkedQueue.peek(); // 检索不移除此队列的头,只查看,如果此队列为空,则返回 null。 concurrentLinkedQueue.poll(); // 检索并移除此队列的头,如果此队列为空,则返回 null。 for (String str : concurrentLinkedQueue) { System.out.println(str); } }

 

 

 

 

 

 

 

 

 

转载于:https://my.oschina.net/LucasZhu/blog/1014835

你可能感兴趣的文章
慎用!BLEU评价NLP文本输出质量存在严重问题
查看>>
Node.js 2017企业用户调查结果发布
查看>>
JAVA的优势就是劣势啊!
查看>>
ELK实战之logstash部署及基本语法
查看>>
帧中继环境下ospf的使用(点到点模式)
查看>>
BeanShell变量和方法的作用域
查看>>
LINUX下防恶意扫描软件PortSentry
查看>>
由数据库对sql的执行说JDBC的Statement和PreparedStatement
查看>>
springmvc+swagger2
查看>>
软件评测-信息安全-应用安全-资源控制-用户登录限制(上)
查看>>
我的友情链接
查看>>
Java Web Application 自架构 一 注解化配置
查看>>
如何 debug Proxy.pac文件
查看>>
Python 学习笔记 - 面向对象(特殊成员)
查看>>
Kubernetes 1.11 手动安装并启用ipvs
查看>>
Puppet 配置管理工具安装
查看>>
Bug多,也别乱来,别被Bug主导了开发
查看>>
sed 替换基础使用
查看>>
高性能的MySQL(5)创建高性能的索引一B-Tree索引
查看>>
oracle备份与恢复--rman
查看>>