1、Kafka 基础知识
本栏目讲解 kafka 相关的知识,包括简介、集群部署、架构及 spring 整合文章目录简介1、概述2、优点3、模式4、高效读写集群部署操作命令简介1、概述:一个分布式的基于发布/订阅模式的消息队列2、优点解耦异步通信峰值处理缓冲可恢复性3、模式点对点模式发布/订阅模式4、高效读写顺序写磁盘:写数据过程是追加到log文件末端零拷贝:直接从系统内存中返回给客户端集群部署# 解压tar -zxvf
·
本栏目讲解 kafka 相关的知识,包括简介、集群部署、架构及 spring 整合
简介
1、概述
- :一个分布式的基于发布/订阅模式的消息队列
2、优点
- 解耦
- 异步通信
- 峰值处理
- 缓冲
- 可恢复性
3、模式
- 点对点模式
- 发布/订阅模式
4、高效读写
- 顺序写磁盘:写数据过程是追加到log文件末端
- 零拷贝:直接从系统内存中返回给客户端
集群部署
# 解压
tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
# 修改解压后的文件夹名称
mv kafka_2.11-0.11.0.0/ kafka
# 修改配置文件
vim config/server.properties
###############################################
# broker 的全局唯一编号,不能重复
broker.id=0
# 删除 topic 功能使能
delete.topic.enable=true
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘 IO 的现成数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
# topic 在当前 broker 上的分区个数
num.partitions=1
# 用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
# 配置连接 Zookeeper 集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
# 注意:配置其他的broker时,其broker.id不同
##############################################
# 配置环境变量
vim /etc/profile
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
# 启动zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# 启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
# 关闭kafka
bin/kafka-server-stop.sh stop
操作命令
# 查看所有topic
bin/kafka-topic.sh --zookeeper hadoop102:2181 --list
# 创建topic
bin/kafka-topic.sh --zookeeper hadoop102:2181 --create --replication-factor 3
--partitions --topic topicName
# 删除topic
bin/kafka-topic.sh --zookeeper hadoop102:2181 --delete --topic topicName
# 发送消息
bin/kafka-console-producer.sh --broker-list hadoop102:2181 --topic topicName
# 消费消息
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic topicName
Spring 整合 Kafka
导入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>
配置信息
spring:
kafka:
producer:
bootstrap-servers: 127.0.0.1:9092
transaction-id-prefix: kafka_tx.
consumer:
bootstrap-servers: 127.0.0.1:9092
消息监听
/**
* 监听消息
*/
@Component
public class ConsumerListener {
@KafkaListener(topics = "msg1", groupId = "group.msg")
public String messageListener2(String message, Acknowledgment acknowledgment) {
System.out.println("获取到的数据:" + message);
acknowledgment.acknowledge();
return message;
}
@KafkaListener(topics = "msg2", topicPartitions = {@TopicPartition(topic = "msg2",
partitions = {"0"})}, containerFactory = "kafkaListenerContainerFactory")
public void messageListener1(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
System.out.println("获取到的数据:" + record.key() + "=" + record.value());
acknowledgment.acknowledge();
}
}
发送消息
@SpringBootTest(classes = KafkaDemoApplication.class)
class PublishMessageTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static final String MSG1 = "msg1";
private static final String MSG2 = "msg2";
/**
* 异步发送不考虑回调,常用作收集操作日志、用户浏览痕迹等不重要的信息
*/
@Test
public void asynPublishMessage1() {
kafkaTemplate.send(MSG1, "Hello");
}
/**
* 异步发送带回调
*/
@Test
public void asynPublishMessage2() {
ProducerRecord<String, String> record = new ProducerRecord<>(MSG2, "key1", "value1");
kafkaTemplate.send(record).addCallback(
new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("发送消息失败,失败原因:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("发送消息成功," + result);
}
});
}
/**
* 同步发送
*/
@Test
public void syncPublishMessage() {
try {
final SendResult<String, String> result = kafkaTemplate.send(MSG1, "Java").get();
System.out.println("发送结果:" + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
/**
* 事务可以保证消息不丢失也不重复
*/
@Test
@Transactional
public void transactionPublishMessage() {
kafkaTemplate.send(MSG1, "World");
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)