Java/SpringBoot + Kafka + Zookeeper(生产者和消费者实现)
Java/SpringBoot + Kafka + Zookeeper(生产者和消费者实现)
·
这里写目录标题
教程为本地虚拟机安装:通过MobaxTerm连接本地Vmware搭建的Centos7环境
一.Zookeeper安装
1.下载:ZookeeperApache
2.安装
# 1.解压zookeeper到安装目录
tar zxvf apache-zookeeper-3.8.0-bin.tar.gz
mv apache-zookeeper-3.8.0-bin /usr/local/zookeeper
# 2.修改配置文件(复制示例文件),指定持久的zookeeper数据目录,如:/mnt/data/zookeeper
cd /usr/local/zookeeper/conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
# 3.启动服务
cd ..
bin/zkServer.sh start conf/zoo.cfg
# 4.查看状态
bin/zkServer.sh status
# 5.客户端连接
bin/zkCli.sh
# 6.连接远程的zookeeper server
bin/zkCli.sh ‐ server ip:port
3.配置文件说明
# The number of milliseconds of each tick
# 最小时间片,单位毫秒
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
# Follower初始化连接Leader超时设置,表示Tick倍数,即:tickTime * initLimit (ms)
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
# Follower与Leader同步数据超时设置,表示Tick倍数,即:tickTime * initLimit (ms)
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
# zookeeper 数据存储目录
dataDir=/tmp/zookeeper
# the port at which the clients will connect
# zookeeper 服务端口
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
# 单个客户端与zookeeper最大并发连接数
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
# 保存数据快照数量 超出后回滚
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
# 自动触发清除任务时间间隔 单位小时 默认为 0 表示不清空
#autopurge.purgeInterval=1
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
二.Kafka安装
1.下载:KafkaApache
2.安装
# 1.解压并移动
tar zxvf kafka_2.13-3.2.0.tgz
mv kafka_2.13-3.2.0 /usr/local/kafka
# 2.修改配置文件
cd /usr/local/kafka/
vi config/server.properties
# 3.启动服务
bin/kafka-server-start.sh -daemon config/server.properties
# 4.连接ZK客户端查看Kafka节点
bin/zkCli.sh
ls /brokers/ids
# 5.停止Kafka
bin/kafka-server-stop.sh
3.配置文件说明
# broker.id 属性在kafka集群中必须要是唯一
broker.id=0
# kafka 本机地址和端口
listeners=PLAINTEXT://192.168.184.128:9092
# kafka 的消息存储文件
log.dir=/mnt/data/kafka‐logs
# kafka 连接 zookeeper 的地址
zookeeper.connect=192.168.184.128:2181
4.本机Java服务查看
jps
5.创建主题
# 1.创建只有一个Partition 且备份因子数也设置为1的主题
bin/kafka-topics.sh --bootstrap-server 192.168.184.128:9092 --create --replication-factor 1 --partitions 1 --topic moon
# 2.查看Topic列表
bin/kafka-topics.sh --bootstrap-server 192.168.184.128:9092 --list
# 3.删除主题
bin/kafka-topics.sh --bootstrap-server 192.168.184.128:9092 --delete --topic moon
6.收发消息
# 1.生产者
bin/kafka-console-producer.sh --broker-list 192.168.184.128:9092 --topic moon
>i am msg
>this is a fish
>exit
>quit
# 2.消费者(默认为开始消费当前之后的数据)
bin/kafka-console-consumer.sh --bootstrap-server 192.168.184.128:9092 --topic moon
# 3.从头消费
bin/kafka-console-consumer.sh --bootstrap-server 192.168.184.128:9092 --from-beginning --topic moon
7.Kafka集群
通过创建多个Broker实例演示
# 1.复制服务端配置文件
cp server.properties server_1.properties
cp server.properties server_2.properties
# 2.编辑 broker.id 分别设为1、2
# 修改日志目录 /mnt/data/kafka‐logs-1 /mnt/data/kafka‐logs-2
# 端口 9093 9094
vi server_1.properties
vi server_2.properties
# 3.启动服务
bin/kafka-server-start.sh -daemon config/server_1.properties
bin/kafka-server-start.sh -daemon config/server_2.properties
查看Zookeeper注册节点信息
bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /brokers/ids
Topic创建和查看
# 1.创建一个副本数为 3 和 分区数为 2 的主题
bin/kafka-topics.sh --bootstrap-server 192.168.184.128:9092 --create --replication-factor 3 --partitions 2 --topic sun
# 2.查看Topic信息
bin/kafka-topics.sh --bootstrap-server 192.168.184.128:9092 --describe --topic sun
# 3.向集群发送消息
bin/kafka-console-producer.sh --broker-list 192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094 --topic sun
# 4.消费
bin/kafka-console-consumer.sh --bootstrap-server 192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094 --from-beginning --topic sun
三.Java客户端访问Kafka(关闭服务器防火墙)
1.依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.5.graal</version>
</dependency>
2.生产者
package com.example.kafkatest.kafka;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* Description: Kafka 生产者
*
* @Author:
* @Date: 2022-06-05 0:41
* @version: V1.0.0
*/
public class KafkaProducer {
private final static String TOPIC_NAME = "moon";
/**
* ACKS_CONFIG 消息持久化
* acks=0 Producer 不需等待 Broker 回复 直接发送下一条
* acks=1 Producer 至少 Leader 将数据写入本地 Log 不需等待 Follower 是否成功写入 发送下一条
* acks=-1或all Producer 等待 min.insync.replicas 个 副本写入日志 才能发送下一条 一般金融级别的服务才会开启此配置
*
* RETRIES_CONFIG 重试
* 默认重试间隔100ms 重试能保证消息发送的可靠性 但网络抖动可能造成消息重复发送 需要在消费端保证幂等
*
* RETRY_BACKOFF_MS_CONFIG 重试间隔
*
* BUFFER_MEMORY_CONFIG 发送消息的本地缓冲区 提高消息发送性能 默认值是33554432 32MB
*
* BATCH_SIZE_CONFIG kafka本地线程会从缓冲区取数据,批量发送到broker
* 设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去
*
* LINGER_MS_CONFIG 默认 0 立即发送消息 一般设为10毫秒 10ms内数据满了一个batch立即发送 未满则到了10ms也发送避免消息延迟
*
* @param args
* @throws InterruptedException
* @throws ExecutionException
*/
public static void main(String[] args) throws InterruptedException, ExecutionException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.184.128:9092,192.168.184.128:9093");
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
//把发送的key从字符串序列化为字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
int msgNum = 5;
final CountDownLatch countDownLatch = new CountDownLatch(msgNum);
for (int i = 1; i <= msgNum; i++) {
MsgCon order = new MsgCon(i, 100 + i, 1, 1000.00);
//指定发送分区
/*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
, 0, order.getOrderId().toString(), JSON.toJSONString(order));*/
//未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME
, order.getOrderId().toString(), JSON.toJSONString(order));
//等待消息发送成功的同步阻塞方法
/*RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());*/
//异步回调方式发送消息
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("消息发送失败...");
}
if (metadata != null) {
System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
countDownLatch.countDown();
}
});
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.close();
}
}
3.消费者
package com.example.kafkatest.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* Description: Kafka消费者
*
* @Author:
* @Date: 2022-06-05 0:47
* @version: V1.0.0
*/
public class KafkaConsumers {
private final static String TOPIC_NAME = "moon";
private final static String CONSUMER_GROUP_NAME = "testGroup";
/**
* AUTO_OFFSET_RESET_CONFIG 消费位置
* latest(默认) :只消费自己启动之后发送到主题的消息
* earliest:第一次从头开始消费,以后按照消费offset记录继续消费
*
* HEARTBEAT_INTERVAL_MS_CONFIG Consumer 给 Broker发送心跳的间隔时间
*
* SESSION_TIMEOUT_MS_CONFIG Consumer 超时故障
* 服务端broker多久感知不到一个 Consumer 心跳就认为他故障了 会将其踢出消费组 对应的 Partition 也会被重新分配给其他 Consumer 默认是10秒
*
*
*
* @param args
*/
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.184.128:9092,192.168.184.128:9093");
// 消费分组名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
// 是否自动提交 Offset,默认就是 true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自动提交 Offset 的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 消费位置
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 心跳间隔
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
// 故障判断
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
// 一次 Poll 最大拉取消息的条数,如果消费者处理速度很快,可以设置大点,如果处理速度一般,可以设置小点
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// 如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
// 消费指定分区
//consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
}
}
四.SpringBoot 继承Kafka(关闭服务器防火墙)
1.依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.6</version>
<type>pom</type>
</dependency>
2.配置文件
server:
port: 8080
spring:
kafka:
bootstrap-servers: 192.168.184.128:9092
producer:
retries: 3 #重试次数
batch-size: 16384
buffer-memory: 33554432
acks: 1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual_immediate
3.生产者
package com.example.kafkatest.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
/**
* Description: 生产者
*
* @Author:
* @Date: 2022-06-05 14:28
* @version: V1.0.0
*/
@RestController
@RequestMapping(("/send"))
public class Send {
private static final String TOPIC = "moon";
private KafkaTemplate<String,String> kafkaTemplate;
@Autowired
public void setKafkaTemplate(KafkaTemplate<String,String> kafkaTemplate){
this.kafkaTemplate = kafkaTemplate;
}
@GetMapping("/producer")
public void send(@RequestParam("msg") String msg){
kafkaTemplate.send(TOPIC,0,"KEY",msg);
}
}
4.消费者
package com.example.kafkatest.controller;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
/**
* Description: 消费者
*
* @Author:
* @Date: 2022-06-05 14:40
* @version: V1.0.0
*/
@Component
public class SendConsumer {
@KafkaListener(topics = "moon",groupId = "group_1")
public void listen_1(ConsumerRecord<String,String> record, Acknowledgment ack){
String msg = record.value();
System.out.println("listen_1:" + msg);
ack.acknowledge();
}
@KafkaListener(topics = "moon",groupId = "group_2")
public void listen_2(ConsumerRecord<String,String> record, Acknowledgment ack){
String msg = record.value();
System.out.println("listen_2:" + msg);
ack.acknowledge();
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)