文
章
目
录
章
目
录
在Java中,负责生产数据的是生产者,负责使用数据的是消费者。没有数据时,消费者等待;数据满时,生产者等待。
生产者-消费者模型(Producer-Consumer problem)是一个非常经典的多线程并发协作的模型。
比如某个模块负责生产数据,而另一个模块负责处理数据。产生数据的模块就形象地被称为生产者;而处理数据的模块,则被称为消费者。
生产者和消费者在同一段时间内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。
生产者-消费者模式所遵循的规则
- 生产者仅仅在缓冲区未满时生产,缓冲区满则停止生产。
- 消费者仅仅在缓冲区有产品时才能消费,缓冲区为空则停止消费。
- 当消费者发现缓冲区没有可消费的产品时会通知生产者。
- 当生产者生产出可消费的产品时,应该通知等待的消费者去消费。
生产者-消费者模型的实现
生产者-消费者模型的实现可以有多重实现方式,这里我们重点介绍3种,对于初学者,可以先重点掌握第2种方式。
1、通过阻塞队列方式实现
public class ProducerConsumerDemo1 {
/**
* 缓冲队列
*/
private final ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);
/**
* 生产者
*/
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.put(i);
}
}
}
/**
* 消费者
*/
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
}
}
}
public Producer getProducer() {
return new Producer();
}
public Consumer getConsumer() {
return new Consumer();
}
public static void main(String[] args) {
ProducerConsumerDemo1 producerConsumerDemo1 = new ProducerConsumerDemo1();
new Thread(producerConsumerDemo1.getProducer()).start();
new Thread(producerConsumerDemo1.getConsumer()).start();
}
}
2、通过wait和notifyAll来实现
/**
* 缓冲区(仓库)
*/
private final List<Object> list = new ArrayList<>();
private final int bufferCount = 10;
public final Object lock = new Object();
/**
* 生产者
*/
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock) {
while (list.size() >= bufferCount) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//仓库未满,继续生产产品
list.add(new Object());
//唤醒消费者去消费产品
lock.notifyAll();
}
}
}
}
/**
* 消费者
*/
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock) {
while (list.size() == 0) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//仓库不是空,继续消费
list.remove(0);
//唤醒生产者去生产产品
lock.notifyAll();
}
}
}
}
public Producer getProducer(){
return new Producer();
}
public Consumer getConsumer(){
return new Consumer();
}
public static void main(String[] args) {
ProducerConsumerDemo2 producerConsumerDemo2=new ProducerConsumerDemo2();
new Thread(producerConsumerDemo2.getProducer()).start();
new Thread(producerConsumerDemo2.getConsumer()).start();
}
}
注意:为什么使用notifyAll()而不使用notify()?
因为假设只有一个生产者和消费者:那么生产者和消费者将按顺序执行。
但是如果有多个生产者和消费者,那么可能出现假死现象:
- 1)一个消费者唤醒了一个生产者,但是在这个生产者拿到锁之前,另一个消费者抢先拿到了锁;
- 2)三个生产者全部等待,某个消费者唤醒的不是生产者,而是另一个消费者;
而解决上述假死现象的方法是使用notifyAll()替换notify(),保证消费者唤醒了生产者,生产者唤醒了消费者。
3、通过ReentrantLock和Condition来实现
public class ProducerConsumerDemo3 {
/**
* 缓冲区(仓库)
*/
private final List<Object> list = new ArrayList<>();
/**
* 缓冲区大小
*/
private final int bufferCount = 10;
public ReentrantLock lock = new ReentrantLock();
//创建两个条件变量
private final Condition condition1 = lock.newCondition();
private final Condition condition2 = lock.newCondition();
/**
* 生产者
*/
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);//模拟生产操作
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
lock.lock();
while (list.size() >= bufferCount) {
condition1.await();//当仓库数据数量超过缓冲区设定的最大数量,则让生产线程进入等待状态
}
list.add(new Object());
System.out.println(Thread.currentThread().getName() + "-生产者生产,数量为:" + list.size());
condition2.signal();//唤醒消费线程
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
lock.lock();
while (list.size() == 0) {
condition2.await();//当仓库中数据为空时,则让消费线程进入等待状态
}
list.remove(0);
System.out.println(Thread.currentThread().getName() + "-消费者消费,数量为:" + list.size());
condition1.signal();//唤醒生产线程
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
public Producer getProducer() {
return new Producer();
}
public Consumer getConsumer() {
return new Consumer();
}
public static void main(String[] args) {
ProducerConsumerDemo3 producerConsumerDemo3 = new ProducerConsumerDemo3();
new Thread(producerConsumerDemo3.getProducer()).start();
new Thread(producerConsumerDemo3.getConsumer()).start();
}
}
总结
以上就是Java多线程:线程间通信(生产者消费者模式)的全部内容。