文
章
目
录
章
目
录
异步模式
传统版
异步模式之生产者/消费者:
class ShareData {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() throws Exception{
// 同步代码块,加锁
lock.lock();
try {
// 判断 防止虚假唤醒
while(number != 0) {
// 等待不能生产
condition.await();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// 通知 唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() throws Exception{
// 同步代码块,加锁
lock.lock();
try {
// 判断 防止虚假唤醒
while(number == 0) {
// 等待不能消费
condition.await();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// 通知 唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class TraditionalProducerConsumer {
public static void main(String[] args) {
ShareData shareData = new ShareData();
// t1线程,生产
new Thread(() -> {
for (int i = 0; i < 5; i++) {
shareData.increment();
}
}, "t1").start();
// t2线程,消费
new Thread(() -> {
for (int i = 0; i < 5; i++) {
shareData.decrement();
}
}, "t2").start();
}
}
改进版
异步模式之生产者/消费者:
- 消费队列可以用来平衡生产和消费的线程资源,不需要产生结果和消费结果的线程一一对应
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式
public class demo {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue(2);
for (int i = 0; i < 3; i++) {
int id = i;
new Thread(() -> {
queue.put(new Message(id,"值"+id));
}, "生产者" + i).start();
}
new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);
Message message = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"消费者").start();
}
}
//消息队列类,Java间线程之间通信
class MessageQueue {
private LinkedList<Message> list = new LinkedList<>();//消息的队列集合
private int capacity;//队列容量
public MessageQueue(int capacity) {
this.capacity = capacity;
}
//获取消息
public Message take() {
//检查队列是否为空
synchronized (list) {
while (list.isEmpty()) {
try {
sout(Thread.currentThread().getName() + ":队列为空,消费者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//从队列的头部获取消息返回
Message message = list.removeFirst();
sout(Thread.currentThread().getName() + ":已消费消息--" + message);
list.notifyAll();
return message;
}
}
//存入消息
public void put(Message message) {
synchronized (list) {
//检查队列是否满
while (list.size() == capacity) {
try {
sout(Thread.currentThread().getName()+":队列为已满,生产者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//将消息加入队列尾部
list.addLast(message);
sout(Thread.currentThread().getName() + ":已生产消息--" + message);
list.notifyAll();
}
}
}
final class Message {
private int id;
private Object value;
//get set
}
阻塞队列
public static void main(String[] args) {
ExecutorService consumer = Executors.newFixedThreadPool(1);
ExecutorService producer = Executors.newFixedThreadPool(1);
BlockingQueue<Integer> queue = new SynchronousQueue<>();
producer.submit(() -> {
try {
System.out.println("生产...");
Thread.sleep(1000);
queue.put(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
consumer.submit(() -> {
try {
System.out.println("等待消费...");
Integer result = queue.take();
System.out.println("结果为:" + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}