go 操作 kafka 实现发送和订阅
Zookeeper startcp conf/zoo_sample.cfg conf/zoo.cfgbin/zkServer.sh start2181 启动bin/zkCli.sh -server 127.0.0.1:2181 进入终端分布式配置 https://blog.csdn.net/java_66666/article/details/81015302kafkabin/kafka-serv
·
消息队列
- 一对一
- 一对多 一个生产者多个消费者
- 消息主动推送给消费者(推送能力一样,但是每个消费者的消费能力不一样)
- 消费者主动去拉去消息(需要长期去轮询查询是否有消息) kafak
kafka架构
- leader 消费者找leader要消息
- follwer 用于集群间的数据同步备份
- 同一个分区消息只能被同一个消费组里的某一个消费者消费
安装kafka
kafak依赖zookeeper 需要先启动zk(集群)
zookeeper 启动
cp conf/zoo_sample.cfg conf/zoo.cfg
bin/zkServer.sh start 2181 启动
bin/zkCli.sh -server 127.0.0.1:2181 进入终端
分布式配置 https://blog.csdn.net/java_66666/article/details/81015302
单节点启动kafka
kafka配置文件 config/server.properties
TODO
log.dir = /xxx 实际上是数据持久化存储的位置
// 配置公网访问
advertised.listeners=PLAINTEXT://你的公网ip:9093
advertised.listeners=PLAINTEXT://你的公网i:9092
kafka启动
bin/kafka-server-start.sh config/server.properties
bin/kafka-server-start.sh -daemon config/server.properties 后台启动
启动kafka集群
配置文件
broker.id=1、broker.id=2 不能重复
群起脚本示例
// 尚硅谷的大数据教程里面的
for i in hadoop102 hadoop103 hadoop104
do
echo "========== $i =========="
ssh $i '/opt/module/kafka/bin/kafka-server-start.sh -daemon
/opt/module/kafka/config/server.properties'
done
基本的命令行操作
创建topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first
选项说明:
--topic 定义 topic 名
--replication-factor 定义副本数 (不能超过集群内zk的数量,否则报错)
--partitions 定义分区数
查看topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
删除topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first
- 需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除
生产消息
bin/kafka-console-producer.sh --brokerlist hadoop102:9092 --topic first
消费消息
普通消费
// 消费当前的消息 从zk读
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first
// 某台消费者后来才启动,可以从头消费消息
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --from-beginning --topic first
- --from-beginning:会把主题中以往所有的数据都读取出来。
新版本消费
// 从kafka读消息
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
kafka高级
存储
生产者
ack 0 1 -1(ISR)
ISR (HW LEO) 多退少补
消费者
高效存储
- 顺序读写
- 零拷贝技术
- 分布式(多个主机多个分区并发读写)
事物 生产者事物(跨回话)
kafka监控工具
eagel
- 1
- 需要加一下环境变量
export KE_HOME=/opt/xxx
export PATN=$PATN:$KE_HOME:bin
- 可能出现问题的配置 我自己没有实际操作过
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
kafka面试题
https://blog.csdn.net/C_Xiang_Falcon/article/details/100917145#1KafkaISRInSyncRepliOSROutSyncRepliARAllRepli_3
xsync 工具
- https://www.cnblogs.com/Mark-blog/p/11603313.html
- https://www.jianshu.com/p/e74fbb091144
go操作kafka
依赖为github的samara
- 参考
https://github.com/Shopify/sarama
生产消息
同步发送消息
func producer1() {
var address = []string{"kafka1:9092"}
// creates a new SyncProducer
// config := sarama.NewConfig()
// 可以根据定制配置
producer, err := sarama.NewSyncProducer(address, nil)
handleErr()
defer producer.Close()
// 构建返回值
result := map[string]string{
"ip": "127.0.0.1",
"info": "需要发送的消息",
}
byteresult, _ := json.Marshal(result)
value := string(byteresult)
// 发送消息对象
msg := &sarama.ProducerMessage{ // ProducerMessage 发送消息的对象
Topic: "monitor",
Value: sarama.ByteEncoder(value),
Key: sarama.ByteEncoder("applicationStop"),
}
// 发送消息
partition, offset, err := producer.SendMessage(msg)
fmt.Printf("发送信息成功,topic::%s,partition=%d, offset=%d \n", topic, partition, offset)
// fmt.Fprintf(os.Stdout, value+"发送成功,partition=%d, offset=%d \n", partition, offset)
}
消费消息
普通一个消费者
func consumer() {
consumer, err := sarama.NewConsumer(address, nil)
if err != nil {
fmt.Println("consumer()::sarama.NewConsumer::", err)
return
}
// close
defer func() {
if err := consumer.Close(); err != nil {
fmt.Println("consumer()::consumer.Close()::", err)
}
}()
// 从 topic 消费 // 消费最新数据
partitionConsumer, err := consumer.ConsumePartition("monitor", 0, sarama.OffsetNewest)
if err != nil {
fmt.Println("consumer()::consumer.ConsumePartition::", err)
return
}
// close
defer func() {
if err := partitionConsumer.Close(); err != nil {
fmt.Println("consumer()::partitionConsumer.Close()::", err)
}
}()
// 监听消息
for {
select {
// ConsumerMessage 接收消息的对象
case msg := <-partitionConsumer.Messages():
if string(msg.Key) == "basic" {
result := map[string]string{}
err := json.Unmarshal(msg.Value, &result)
fmt.Println("消息为::",result)
}
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)