Java多线程:线程间通信(生产者消费者模式)

培训教学 潘老师 6个月前 (11-09) 125 ℃ (0) 扫码查看

在Java中,负责生产数据的是生产者,负责使用数据的是消费者。没有数据时,消费者等待;数据满时,生产者等待。

生产者-消费者模型(Producer-Consumer problem)是一个非常经典的多线程并发协作的模型。

比如某个模块负责生产数据,而另一个模块负责处理数据。产生数据的模块就形象地被称为生产者;而处理数据的模块,则被称为消费者。

生产者和消费者在同一段时间内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。生产者-消费者模式示意图

生产者-消费者模式所遵循的规则

  • 生产者仅仅在缓冲区未满时生产,缓冲区满则停止生产。
  • 消费者仅仅在缓冲区有产品时才能消费,缓冲区为空则停止消费。
  • 当消费者发现缓冲区没有可消费的产品时会通知生产者。
  • 当生产者生产出可消费的产品时,应该通知等待的消费者去消费。

生产者-消费者模型的实现

生产者-消费者模型的实现可以有多重实现方式,这里我们重点介绍3种,对于初学者,可以先重点掌握第2种方式。

1、通过阻塞队列方式实现

public class ProducerConsumerDemo1 {
 
    /**
     * 缓冲队列
     */
    private final ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);
 
    /**
     * 生产者
     */
    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.put(i);
            }
        }
    }
 
    /**
     * 消费者
     */
    class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
            }
        }
    }
 
    public Producer getProducer() {
        return new Producer();
    }
 
    public Consumer getConsumer() {
        return new Consumer();
    }
 
 
    public static void main(String[] args) {
        ProducerConsumerDemo1 producerConsumerDemo1 = new ProducerConsumerDemo1();
        new Thread(producerConsumerDemo1.getProducer()).start();
        new Thread(producerConsumerDemo1.getConsumer()).start();
    }
}

2、通过wait和notifyAll来实现

    /**
     * 缓冲区(仓库)
     */
    private final List<Object> list = new ArrayList<>();
 
    private final int bufferCount = 10;
 
    public final Object lock = new Object();
 
    /**
     * 生产者
     */
    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
 
                synchronized (lock) {
                    while (list.size() >= bufferCount) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    //仓库未满,继续生产产品
                    list.add(new Object());
                    //唤醒消费者去消费产品
                    lock.notifyAll();
                }
            }
        }
    }
 
    /**
     * 消费者
     */
    class Consumer implements Runnable {
 
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
 
                synchronized (lock) {
                    while (list.size() == 0) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    //仓库不是空,继续消费
                    list.remove(0);
                    //唤醒生产者去生产产品
                    lock.notifyAll();
                }
            }
        }
    }
 
    public Producer getProducer(){
        return new Producer();
    }
 
    public Consumer getConsumer(){
        return new Consumer();
    }
 
    public static void main(String[] args) {
        ProducerConsumerDemo2 producerConsumerDemo2=new ProducerConsumerDemo2();
        new Thread(producerConsumerDemo2.getProducer()).start();
        new Thread(producerConsumerDemo2.getConsumer()).start();
    }
}

注意:为什么使用notifyAll()而不使用notify()?

因为假设只有一个生产者和消费者:那么生产者和消费者将按顺序执行。

但是如果有多个生产者和消费者,那么可能出现假死现象:

  • 1)一个消费者唤醒了一个生产者,但是在这个生产者拿到锁之前,另一个消费者抢先拿到了锁;
  • 2)三个生产者全部等待,某个消费者唤醒的不是生产者,而是另一个消费者;

而解决上述假死现象的方法是使用notifyAll()替换notify(),保证消费者唤醒了生产者,生产者唤醒了消费者。

3、通过ReentrantLock和Condition来实现

public class ProducerConsumerDemo3 {
 
    /**
     * 缓冲区(仓库)
     */
    private final List<Object> list = new ArrayList<>();
    /**
     * 缓冲区大小
     */
    private final int bufferCount = 10;
    public ReentrantLock lock = new ReentrantLock();
    //创建两个条件变量
    private final Condition condition1 = lock.newCondition();
    private final Condition condition2 = lock.newCondition();
 
 
    /**
     * 生产者
     */
    class Producer implements Runnable {
 
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);//模拟生产操作
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
 
                try {
                    lock.lock();
                    while (list.size() >= bufferCount) {
                        condition1.await();//当仓库数据数量超过缓冲区设定的最大数量,则让生产线程进入等待状态
                    }
 
                    list.add(new Object());
                    System.out.println(Thread.currentThread().getName() + "-生产者生产,数量为:" + list.size());
                    condition2.signal();//唤醒消费线程
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
 
            }
        }
    }
 
    class Consumer implements Runnable {
 
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
 
                try {
                    lock.lock();
                    while (list.size() == 0) {
                        condition2.await();//当仓库中数据为空时,则让消费线程进入等待状态
                    }
                    list.remove(0);
                    System.out.println(Thread.currentThread().getName() + "-消费者消费,数量为:" + list.size());
                    condition1.signal();//唤醒生产线程
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
 
    public Producer getProducer() {
        return new Producer();
    }
 
    public Consumer getConsumer() {
        return new Consumer();
    }
 
    public static void main(String[] args) {
        ProducerConsumerDemo3 producerConsumerDemo3 = new ProducerConsumerDemo3();
        new Thread(producerConsumerDemo3.getProducer()).start();
        new Thread(producerConsumerDemo3.getConsumer()).start();
    }
}

 总结

以上就是Java多线程:线程间通信(生产者消费者模式)的全部内容。


版权声明:本站文章,如无说明,均为本站原创,转载请注明文章来源。如有侵权,请联系博主删除。
本文链接:https://www.panziye.com/teach/10921.html
喜欢 (0)
请潘老师喝杯Coffee吧!】
分享 (0)
用户头像
发表我的评论
取消评论
表情 贴图 签到 代码

Hi,您需要填写昵称和邮箱!

  • 昵称【必填】
  • 邮箱【必填】
  • 网址【可选】