博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java并发编程(二十四)----(JUC集合)ArrayBlockingQueue和LinkedBlockingQueue介绍
阅读量:2212 次
发布时间:2019-05-06

本文共 6735 字,大约阅读时间需要 22 分钟。

这一节我们来了解阻塞队列(BlockingQueue),BlockingQueue接口定义了一种阻塞的FIFO queue,每一个BlockingQueue都有一个容量,当容量满时往BlockingQueue中添加数据时会造成阻塞,当容量为空时取元素操作会阻塞。首先我们来看ArrayBlockingQueue和LinkedBlockingQueue.

ArrayBlockingQueue

ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。

我们看他的构造函数实现:

//默认是非公平的,初始指定队列容量public ArrayBlockingQueue(int capacity) {       this(capacity, false);   }//该构造方法可以设置队列的公平性。当然如果为公平的,则对性能会产生影响//访问者的公平性是使用可重入锁实现的public ArrayBlockingQueue(int capacity, boolean fair) {       if (capacity <= 0)           throw new IllegalArgumentException();       this.items = new Object[capacity];       lock = new ReentrantLock(fair);       notEmpty = lock.newCondition();       notFull =  lock.newCondition();   }

使用很简单我们直接看一个实例:

public class ProducerConsumerTest {   public static void main(String[] args) {        final BlockingQueue
blockingQueue = new ArrayBlockingQueue
(3); ExecutorService service = Executors.newFixedThreadPool(10); for(int i = 0;i<4;i++){ service.execute(new ProducerAndConsumer(blockingQueue)); } }}class ProducerAndConsumer implements Runnable{ private boolean flag = false; private Integer j = 1; private Lock lock = new ReentrantLock(); Condition pro_con = lock.newCondition(); Condition con_con = lock.newCondition(); private BlockingQueue
blockingQueue; public ProducerAndConsumer(BlockingQueue
blockingQueue){ this.blockingQueue= blockingQueue; } //生产 public void put(){ try { lock.lock(); while(flag) pro_con.await(); System.out.println("正在准备放入数据。。。"); Thread.sleep(new Random().nextInt(10)*100); Integer value = new Random().nextInt(30); blockingQueue.put(value); System.out.println(Thread.currentThread().getName()+" 放入的数据 "+value); flag = true; con_con.signal(); } catch (Exception e) { e.printStackTrace(); } finally{ lock.unlock(); } } public void get(){ try { lock.lock(); while(!flag) con_con.await(); System.out.println("正在准备取数据。。。"); Thread.sleep(new Random().nextInt(10)*1000); System.out.println(Thread.currentThread().getName()+" 取到的数据为"+blockingQueue.take()); flag = false; pro_con.signal(); } catch (Exception e) { e.printStackTrace(); } finally{ lock.unlock(); } } @Override public void run() { while(true){ if(j==1){ put(); } else{ get(); } j=(j+1)%2; } }}

输出为:

正在准备放入数据。。。正在准备放入数据。。。正在准备放入数据。。。正在准备放入数据。。。pool-1-thread-2   放入的数据    13正在准备取数据。。。pool-1-thread-3   放入的数据    4正在准备取数据。。。pool-1-thread-3   取到的数据为13正在准备放入数据。。。pool-1-thread-1   放入的数据    11正在准备取数据。。。pool-1-thread-4   放入的数据    26正在准备取数据。。。pool-1-thread-1   取到的数据为4正在准备放入数据。。。pool-1-thread-2   取到的数据为11正在准备放入数据。。。pool-1-thread-3   放入的数据    18正在准备取数据。。。......

LinkedBlockingQueue

LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

先看一下他的构造函数:

public LinkedBlockingQueue() {  this(Integer.MAX_VALUE);  //MAX_VALUE=2147483647 }public LinkedBlockingQueue(int capacity) {     if (capacity <= 0) throw new IllegalArgumentException();     this.capacity = capacity;     last = head = new Node
(null); }

我们还是直接开看一个例子:

public class BlockingQueueTest {
/** * 定义装苹果的篮子 */ public static class Basket {
// 篮子,能够容纳3个苹果 // BlockingQueue
basket = new ArrayBlockingQueue
(3); BlockingQueue
basket = new LinkedBlockingQueue
(3); // 生产苹果,放入篮子 public void produce() throws InterruptedException { // put方法放入一个苹果,若basket满了,等到basket有位置 basket.put("An apple"); } // 消费苹果,从篮子中取走 public String consume() throws InterruptedException { // get方法取出一个苹果,若basket为空,等到basket有苹果为止 return basket.take(); } } //  测试方法 public static void testBasket() { // 建立一个装苹果的篮子 final Basket basket = new Basket(); // 定义苹果生产者 class Producer implements Runnable { public String instance = ""; public Producer(String a) { instance = a; } public void run() { try { while (true) { // 生产苹果 System.out.println("生产者准备生产苹果:" + instance); basket.produce(); System.out.println("! 生产者生产苹果完毕:" + instance); // 休眠300ms Thread.sleep(300); } } catch (InterruptedException ex) { } } } // 定义苹果消费者 class Consumer implements Runnable { public String instance = ""; public Consumer(String a) { instance = a; } public void run() { try { while (true) { // 消费苹果 System.out.println("消费者准备消费苹果:" + instance); basket.consume(); System.out.println("! 消费者消费苹果完毕:" + instance); // 休眠1000ms Thread.sleep(1000); } } catch (InterruptedException ex) { } } } ExecutorService service = Executors.newCachedThreadPool(); Producer producer = new Producer("P1"); Producer producer2 = new Producer("P2"); Consumer consumer = new Consumer("C1"); service.submit(producer); service.submit(producer2); service.submit(consumer); // 程序运行3s后,所有任务停止 try { Thread.sleep(3000); } catch (InterruptedException e) { } service.shutdownNow(); } public static void main(String[] args) { BlockingQueueTest.testBasket(); }}

输出为:

生产者准备生产苹果:P1消费者准备消费苹果:C1! 生产者生产苹果完毕:P1生产者准备生产苹果:P2! 消费者消费苹果完毕:C1! 生产者生产苹果完毕:P2生产者准备生产苹果:P2! 生产者生产苹果完毕:P2生产者准备生产苹果:P1! 生产者生产苹果完毕:P1生产者准备生产苹果:P2生产者准备生产苹果:P1消费者准备消费苹果:C1! 消费者消费苹果完毕:C1! 生产者生产苹果完毕:P2生产者准备生产苹果:P2消费者准备消费苹果:C1! 消费者消费苹果完毕:C1! 生产者生产苹果完毕:P1生产者准备生产苹果:P1消费者准备消费苹果:C1! 消费者消费苹果完毕:C1! 生产者生产苹果完毕:P2Process finished with exit code 0

转载于:https://www.cnblogs.com/rickiyang/p/11074246.html

你可能感兴趣的文章
搞懂分布式技术15:缓存更新的套路
查看>>
搞懂分布式技术16:浅谈分布式锁的几种方案
查看>>
搞懂分布式技术17:浅析分布式事务
查看>>
搞懂分布式技术18:分布式事务常用解决方案
查看>>
搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务
查看>>
搞懂分布式技术20:消息队列因何而生
查看>>
搞懂分布式技术21:浅谈分布式消息技术 Kafka
查看>>
后端技术杂谈1:搜索引擎基础倒排索引
查看>>
后端技术杂谈2:搜索引擎工作原理
查看>>
后端技术杂谈3:Lucene基础原理与实践
查看>>
后端技术杂谈4:Elasticsearch与solr入门实践
查看>>
后端技术杂谈5:云计算的前世今生
查看>>
后端技术杂谈6:白话虚拟化技术
查看>>
后端技术杂谈7:OpenStack的基石KVM
查看>>
后端技术杂谈8:OpenStack架构设计
查看>>
后端技术杂谈9:先搞懂Docker核心概念吧
查看>>
后端技术杂谈10:Docker 核心技术与实现原理
查看>>
夯实Java基础系列2:Java自动拆装箱里隐藏的秘密
查看>>
夯实Java基础系列1:Java面向对象三大特性(基础篇)
查看>>
夯实Java基础系列3:一文搞懂String常见面试题,从基础到实战,更有原理分析和源码解析!
查看>>