简介

通过前几篇文章的学习,我想你已经彻底掌握了wait()方法和notify()方法如何使用以及在哪种情况下使用了,本片文章我们将讲解下设计模式中的生产者消费者模式,我将通过手写一份生产者消费者模式的代码,进行讲解。学习领悟了这种设计模式,才可以真正理解消息中间件实现的底层思想,比如Kafka、RocketMQ、RabbitMQ等。

建议收藏:

关于synchronized关键字wait()notify()方法的系列教程,请参考以下文章:

《Java中线程安全和线程不安全解析和示例》

《Java官方文档创建线程的两种方式及优点和缺点分析》

《Java如何正确停止线程(三种场景)》

《设计模式之生产者消费者模式》

《Java多线程wait()和notify()系列方法使用教程》

《Java多线程中notifyAll()方法使用教程》

《Java两个线程交替打印奇偶数(两种方法对比)》

《Java中synchronized实现类锁的两种方式及原理解析》

《Java中synchronized实现对象锁的两种方式及原理解析》

《Java中Synchronized的可重入性和不可中断性的分析和代码验证》

《Java多线程访问Synchronized同步方法的八种使用场景》

一.生产者消费者模式

生产者消费者模式虽然不属于常见的设计模式之一,但也是编程中一种常用的高效的编程模式。所谓设计模式,无非是前辈们实践出来的好用“套路”,我们学习之,可以少走弯路,事半功倍。

生产者就是生产数据的地方,消费者就是消费数据的地方,二者通过这种模式可以达到有效的降低耦合,将生产者和消费者分离开,从而降低边际成本、节省资源、提高工作效率。

举了通俗例子

消费者

就像是网购,作为消费者,我们不需要认识所有卖家,消费者有购买需求,打开购物APP,如果能找到需要的就下单购买即可,如过没有自己需要的,就再等几天。我们不需要跟所有的工厂保持联系,我们只需要关注我们喜欢的商品即可。

生产者

同样作为卖家,我们也不需要认识全天下的消费者,卖家把准备好的商品拍照、打包,并将商品信息同步到购物APP上即可,他不需要一个一个的去问陌生的顾客需不需要,只需要等客户咨询时服务即可。

缓冲区

购物APP就是连接生产者和消费者的缓冲区,缓冲区中的商品是卖家存入的,消费者可以在缓冲区选择和购买自己需要的商品。

有了以上基本的理解后,接下来由浅入深,我们来看一下用Java代码实现一个生产者消费者模式。

二.代码实现

1.生产者

代码描述: 生产者只需负责生产产品,然后将产品放入到缓冲队列Queue中。

解析: 生产者只需要了解产品,然后将产品送入缓存队列中去,它不需要认识消费者。代码阅读起来非常简洁,好维护。

class Producer implements Runnable {
	private Queue queue;

	public Producer(Queue queue) {
		this.queue = queue;
	}

	@Override
	public void run() {
		for (int i = 0; i < 100; i++) {
			queue.put();
		}
	}
}

2.消费者

代码描述: 消费者只需负责从缓存队列Queue中取出产品使用即可。

解析: 消费者无需了解和关联消费者,消费者只跟缓存队列相关联。这样消费者的代码也非常简洁,它无需关系生产者生产产品是何逻辑,也不用跟生产者直接联系。

class Consumer implements Runnable {
	private Queue queue;

	public Consumer(Queue queue) {
		this.queue = queue;
	}

	@Override
	public void run() {
		for (int i = 0; i < 100; i++) {
			queue.take();
		}
	}
}

3.缓存队列

代码描述: 缓存队列Queue需要提供两个接口,一个是允许生产者像队列中存放产品。另一个接口是允许消费者从队列中获取产品。

解析: 缓存队列的核心职责就两个:存和取。从设计思想来说非常简练。

put()方法 : 此方法必须是同步方法,如果有多个生产者,则能保证缓存队列不会爆仓(多个线程存入产品数量大于最大值)。

为什么要加while(MAX_VALUE == list.size())判断,换成if语句可以吗??
答案:不可以换成if语句。此行的含义是当缓存队列达到最大值上限后,禁止再存入产品。while(条件不满足){wait()}是为了防止线程被虚假唤醒。

虚假唤醒:一个线程可以从挂起状态直接变为可运行状态,即时没有被其他线程调用wait()、notify()系列方法、或者被中断、或者等待超时。

虚假唤醒很少发生,但要防范,解决方案是不停的测试线程被唤醒的条件是否满足,不满足则等待,只当满足唤醒条件时,退出循环。

在本例中,put()和take()方法都增加了防止虚假唤醒的代码,即如果条件不满足时,才可以继续存入或提取产品,否则将等待。

class Queue {

	//每次生产最大容量
	private Integer MAX_VALUE = 10;
	private LinkedList<String> list = new LinkedList<>();
	
	// 入库方法:达到最大值后停止生产
	public synchronized void put() {
		while (MAX_VALUE == list.size()) {
			try {
			    //执行到此处时,下面的list.add()将不被执行,直到不满足循环条件
				wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		list.add(UUID.randomUUID().toString());
		System.out.println("生产出:" + list.size() + "个宝贝");
		notify();
	}

	// 出库:库存为0后,停止消费
	public synchronized void take() {

		while (0 == list.size()) {
			try {
			      //执行到此处时,下面的list.poll()将不被执行,直到不满足循环条
				wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println("消费:" + list.poll() + ",剩余宝贝数量:" + list.size());
		notify();
	}

}

4.客户端使用

客户端只需要创建出缓存队列,并且创建出生产者和消费者连个线程运行即可。在客户端使用起来也非常友好,无需复杂的配置,非常条理。这就是设计模式的作用,这也是前辈们的经验总结,所以我们要站在巨人的肩膀上。

public class ProducerConsumerPattern {

	public static void main(String[] args) {
		Queue queue = new Queue();
		Thread consumer = new Thread(new Consumer(queue));
		Thread producer = new Thread(new Producer(queue));
		producer.start();
		consumer.start();
	}
}

总结

本文通过wait()notify()方法,实现和分析消费者和生产者模式,此模式优雅的实现了生产者和消费者的解耦,并且支持并发,一个生产者可以支持多个消费者。一个消费者也可以支持多个生产者。还可以合理利用系统资源,当生产者任务多时,消费者任务少时,系统可将资源多分配给生产者,反之亦然。喜欢本系列教程文章请点赞、关注和收藏。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐