天增的博客
首页
博客
  • 分布式解决方案
  • Java并发工具包
  • redis
  • LeetCode
  • 系统设计
  • JVM体系
Github (opens new window)
Rss (opens new window)
  • zh-CN
  • en-US
首页
博客
  • 分布式解决方案
  • Java并发工具包
  • redis
  • LeetCode
  • 系统设计
  • JVM体系
Github (opens new window)
Rss (opens new window)
  • zh-CN
  • en-US
  • Java并发工具包
  • 并发基础
    • 线程基础
      • Thread的状态
      • 进程与线程
      • 正确停止线程的方式
      • Thread的实现方式
      • waitnotifynotifyAll
      • 生产者消费者模型
        • 生产者消费者模式
        • 使用 BlockingQueue 实现生产者消费者模式
        • 使用 Condition 实现生产者消费者模式
        • 使用 wait/notify 实现生产者消费者模式
    • 线程安全
      • 线程不安全
      • 线程安全
      • 需要注意线程安全问题的情况
  • 并发工具
    • 线程协作
      • Semaphore信号量
      • CountDownLatch详解
      • 使用CompletableFuture解决旅游平台问题
      • 使用CyclicBarrier解决团建问题
    • Future
      • Future主要功能
      • FutureTask源码分析
    • ThreadLocal
      • ThreadLocal内存泄漏
      • ThreadLocal使用场景
    • 原子类
      • 原子类的作用概览
      • 原子类的性能分析
    • 阻塞队列
      • 常见的阻塞队列
      • 阻塞队列的常用方法
      • 什么是阻塞队列
    • 并发容器
      • HashMap
      • CopyOnWriteArrayList
      • ConcurrentHashMap详解
    • 线程池
      • 为什么多线程会带来性能问题
      • 线程池的优势
      • 创建线程池的参数
        • 如何设置线程数
      • 线程池线程复用原理
      • ForkJoin框架
    • 各种锁
      • 锁的种类和特点
        • 公平锁非公平锁
        • 自旋锁非自旋锁
        • 共享锁独占锁
        • 乐观锁和悲观锁
      • JVM锁优化
      • synchronized和Lock的对比
      • lock的常用方法
  • 底层原理
    • CAS原理
    • AQS框架
    • 伪共享
    • java内存模型
      • Java内存模型介绍
      • happens-before规则
  • topic
  • Java并发工具包
  • 并发基础
  • 线程基础
  • 生产者消费者模型
2022-04-21
目录

生产者消费者模型

# 生产者消费者模型

# 生产者消费者模式

生产者消费者,是在软件开发中很常见的一种设计模式,大致结构如下图

生产者消费者模型

生产者和消费者最核心的就是那个队列,用于平衡生产者生产速度和消费者消费速度不一致

  1. 在队列满了之后,生产者则会阻塞,在队列空了之后,消费者则会阻塞。
  2. 队列非空组则提醒消费者继续消费,队列非慢则提醒生产者继续生产

# 使用 BlockingQueue 实现生产者消费者模式

代码很简单

就是创建两个消费者线程和两个生产者线程,通过BlockQueue这个中间媒介,时期不断的进行生产-> 消费的循环

public static void main(String[] args) {
        BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
        Runnable producer = () -> {
            while (true) {
                try {
                    queue.put(new Object());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        new Thread(producer).start();
        new Thread(producer).start();
        Runnable consumer = () -> {
            while (true) {
                try {
                    queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        };
        new Thread(consumer).start();
        new Thread(consumer).start();
    }

# 使用 Condition 实现生产者消费者模式

我们利用lock的Condition来实现一个简易版的BlockingQueue

public static class MyBlockingQueueForCondition {
        private Queue queue;
        private int max = 16;
        private ReentrantLock lock = new ReentrantLock();
        private Condition notFull = lock.newCondition();
        private Condition notEmpty = lock.newCondition();

        public MyBlockingQueueForCondition(int max) {
            this.max = max;
            queue = new LinkedList();
        }

        public void put(Object v) throws InterruptedException {
            lock.lock();
            try {
                while (queue.size() == max) {
                    notFull.await();
                }
                queue.add(v);
                notEmpty.signalAll();
            } finally {
                lock.unlock();
            }
        }

        public Object take() throws InterruptedException {
            lock.lock();
            try {
                while (queue.size() == 0) {
                    notEmpty.await();
                }
                Object o = queue.remove();
                notFull.signalAll();
                return o;
            } finally {
                lock.unlock();
            }
        }
    }

最灵魂的操作是使用while循环来判断临界情况 ,为什么不用if来进行判断?

在多个线程进入put操作的时候,发现队列已经满了,多个线程都进入等待状态,然后在notFull.signalAll()的时候多个线程都会调用add(v)操作,导致队列中的数量大于max的限定值;反之,同理。

# 使用 wait/notify 实现生产者消费者模式

使用wait/notify的方式,是使用lock的方式相似。

class MyBlockingQueue {

   private int maxSize;

   private LinkedList<Object> storage;

   public MyBlockingQueue(int size) {

       this.maxSize = size;

       storage = new LinkedList<>();

   }

   public synchronized void put() throws InterruptedException {
       while (storage.size() == maxSize) {
           wait();
       }
       storage.add(new Object());
       notifyAll();
   }

   public synchronized void take() throws InterruptedException {
       while (storage.size() == 0) {
           wait();
       }
       storage.poll();
       notifyAll();
   }

}
最近更新
01
以 root 身份启动 transmission-daemon
12-13
02
Debian系统安装qbittorrent-nox
12-09
03
LXC Debain12安装zerotier并实现局域网自动nat转发
07-29
更多文章>
Theme by Vdoing | Copyright © 2015-2024 天增 | 苏ICP备16037388号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式