消息队列

  • 一对一
  • 一对多 一个生产者多个消费者
    • 消息主动推送给消费者(推送能力一样,但是每个消费者的消费能力不一样)
    • 消费者主动去拉去消息(需要长期去轮询查询是否有消息) kafak

kafka架构

  • leader 消费者找leader要消息
  • follwer 用于集群间的数据同步备份
  • 同一个分区消息只能被同一个消费组里的某一个消费者消费

安装kafka

kafak依赖zookeeper 需要先启动zk(集群)
zookeeper 启动
cp conf/zoo_sample.cfg conf/zoo.cfg
bin/zkServer.sh start     2181 启动
bin/zkCli.sh -server 127.0.0.1:2181 进入终端

分布式配置 https://blog.csdn.net/java_66666/article/details/81015302

单节点启动kafka
kafka配置文件 config/server.properties
TODO
log.dir = /xxx   实际上是数据持久化存储的位置

// 配置公网访问
advertised.listeners=PLAINTEXT://你的公网ip:9093
advertised.listeners=PLAINTEXT://你的公网i:9092
kafka启动
bin/kafka-server-start.sh config/server.properties
 bin/kafka-server-start.sh -daemon  config/server.properties  后台启动

启动kafka集群
配置文件
broker.id=1、broker.id=2 不能重复
群起脚本示例
// 尚硅谷的大数据教程里面的
for i in hadoop102 hadoop103 hadoop104
do
echo "========== $i ==========" 
ssh $i '/opt/module/kafka/bin/kafka-server-start.sh -daemon 
/opt/module/kafka/config/server.properties'
done
基本的命令行操作
创建topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first

选项说明:
	--topic 定义 topic 名
	--replication-factor 定义副本数 (不能超过集群内zk的数量,否则报错)
	--partitions 定义分区数
查看topic
  • bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
删除topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first

 - 需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除
生产消息
 bin/kafka-console-producer.sh --brokerlist hadoop102:9092 --topic first
消费消息
普通消费
// 消费当前的消息 从zk读
 bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first
 // 某台消费者后来才启动,可以从头消费消息
 bin/kafka-console-consumer.sh --zookeeper hadoop102:2181   --from-beginning --topic first 
	- --from-beginning:会把主题中以往所有的数据都读取出来。
新版本消费
// 从kafka读消息
 bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

kafka高级

存储
生产者
ack 0 1 -1(ISR)
ISR (HW LEO) 多退少补
消费者
高效存储
  • 顺序读写
  • 零拷贝技术
  • 分布式(多个主机多个分区并发读写)
事物 生产者事物(跨回话)

kafka监控工具

eagel
  • 1
  • 需要加一下环境变量
export KE_HOME=/opt/xxx
export PATN=$PATN:$KE_HOME:bin
  • 可能出现问题的配置 我自己没有实际操作过
 export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
 export JMX_PORT="9999"

kafka面试题

  • https://blog.csdn.net/C_Xiang_Falcon/article/details/100917145#1KafkaISRInSyncRepliOSROutSyncRepliARAllRepli_3
xsync 工具
  • https://www.cnblogs.com/Mark-blog/p/11603313.html
  • https://www.jianshu.com/p/e74fbb091144

go操作kafka

依赖为github的samara
  • 参考 https://github.com/Shopify/sarama
生产消息
同步发送消息
func producer1() {
	var address = []string{"kafka1:9092"}

	// creates a new SyncProducer
	// config := sarama.NewConfig()
	// 可以根据定制配置
	producer, err := sarama.NewSyncProducer(address, nil)
	handleErr()
	defer producer.Close()

	// 构建返回值
	result := map[string]string{
		"ip":      "127.0.0.1", 
		"info":     "需要发送的消息",
	}
	byteresult, _ := json.Marshal(result)
	value := string(byteresult)
	// 发送消息对象
	msg := &sarama.ProducerMessage{ // ProducerMessage 发送消息的对象
		Topic: "monitor", 
		Value: sarama.ByteEncoder(value),
		Key:   sarama.ByteEncoder("applicationStop"),
	}
	// 发送消息
	 partition, offset, err := producer.SendMessage(msg)
	fmt.Printf("发送信息成功,topic::%s,partition=%d, offset=%d \n", topic, partition, offset)
	// fmt.Fprintf(os.Stdout, value+"发送成功,partition=%d, offset=%d \n", partition, offset)
}

消费消息
普通一个消费者
func consumer() {

	consumer, err := sarama.NewConsumer(address, nil)
	if err != nil {
		fmt.Println("consumer()::sarama.NewConsumer::", err)
		return
	}
	// close
	defer func() {
		if err := consumer.Close(); err != nil {
			fmt.Println("consumer()::consumer.Close()::", err)
		}
	}()

	// 从 topic 消费 // 消费最新数据
	partitionConsumer, err := consumer.ConsumePartition("monitor", 0, sarama.OffsetNewest)
	if err != nil {
		fmt.Println("consumer()::consumer.ConsumePartition::", err)
		return
	}
	// close
	defer func() {
		if err := partitionConsumer.Close(); err != nil {
			fmt.Println("consumer()::partitionConsumer.Close()::", err)
		}
	}()
	// 监听消息
	for {
		select {
		// ConsumerMessage 接收消息的对象
		case msg := <-partitionConsumer.Messages(): 

			if string(msg.Key) == "basic" {
				result := map[string]string{}
				err := json.Unmarshal(msg.Value, &result)
				fmt.Println("消息为::",result)
		}
	}
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐