LogAgent的工作流程:
1.读日志——tailf第三方库
2.往kafka中写日志 – sarama第三方库

kafka参考网站
介绍:
Kafaka是一个分布式数据流平台,可以运行在单台服务器上,也可以在多台服务器上部署形成集群。它提供了发布和订阅功能,使用者可以发送数据到Kafka中,也可以从Kafka中读取数据(以便进行后续的处理)。Kafka具有高吞吐、低延迟、高容错等特点。
在这里插入图片描述
在这里插入图片描述

1.Kafaka集群的架构:
1.broker:每台机器都是一个broker
2.topic:每个日志都给一个分类
3.partition:分区,把同一个topic分成不同的分区,提高负载
1.leader:分区的主节点(老大)
2.follower:分区的从节点(小弟)
4.Consumer Group
2.生产者往Kafka发送数据的流程(6步)
在这里插入图片描述
3.Kafka选择分区的模式(3种)
1.指定往哪个分区写
2.指定key,kafka根据key做hash然后决定写哪个分区
3.轮询方式
4.生产者往kafka发送数据的模式(3种)
1.0:把数据发给leader就成功,效率最高,安全性最低
2.1:把数据发送给leader,等待leader回ACK
3.all:把数据发给leader,follower从leader拉取数据回复ack给leader,leader再回复ACK;安全性最高
5.分区存储文件的原理
6.为什么kafka快?(随机读变成顺序读,记住了每个文件在磁盘中的位置)
7.消费者组
在这里插入图片描述
每个消费者实例可以消费多个分区,但是每个分区最多只能被消费者组中的一个实例消费。

zookeeper工作原理:
每当主机启动服务时,先登记到zookeeper中,zookeeper会生成一个目录项,当调用服务时,会先查zookeeper
在这里插入图片描述

tail库:

//tailf读文件实例
package main

import (
	"fmt"
	"time"

	"github.com/hpcloud/tail"
)

func main() {
	fileName := "./my.log"
	config := tail.Config{
		ReOpen:    true,                                 //重新打开
		Follow:    true,                                 //是否跟随
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件哪个地方开始读
		MustExist: false,                                //文件不存在不报错
		Poll:      true,                                 
	}
	tails, err := tail.TailFile(fileName, config)
	if err != nil {
		fmt.Println("tail file failed,err:", err)
		return
	}
	var (
		msg *tail.Line
		ok  bool
	)
	for {
		msg, ok = <-tails.Lines
		if !ok {
			fmt.Println("tail file close reopen,filename:", err)
			time.Sleep(time.Second)
			continue
		}
		fmt.Println("msg:", msg.Text)
	}
}

kafka的启用:
先执行zookeeper的zkServer.cmd
然后执行在D:\apache-zookeeper-3.8.0-bin下执行以下命令:

kafka启用命令
.\bin\windows\kafka-server-start.bat .\config\server.properties

执行下列代码:

package main

import (
	"fmt"

	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
	//tailf包使用
	config.Producer.RequiredAcks = sarama.WaitForAll          //发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner //新选出一个partition
	config.Producer.Return.Successes = true                   //成功交付的消息将在success channel返回
	//构造一个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "web_log"
	msg.Value = sarama.StringEncoder("this is a test log")
	//连接kafka
	client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		fmt.Println("producer closed,err:", err)
		return
	}
	fmt.Println("连接kafka成功!")
	defer client.Close()
	pid, offset, err := client.SendMessage(msg) //offset是写成功的文件的索引位置
	if err != nil {
		fmt.Println("send msg failed,err:", err)
		return
	}
	fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

kafka消费实例

package main

import (
	"fmt"

	"github.com/Shopify/sarama"
)

//kafka 消费者 实例

func main() {
	consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
	if err != nil {
		fmt.Printf("fail to start consumer,err:%v\n", err)
		return
	}
	partitionList, err := consumer.Partitions("web_log") //根据topic取到所有的分区
	if err != nil {
		fmt.Println("fail to get list of partition:", err)
		return
	}
	fmt.Println(partitionList)
	for partition := range partitionList {
		pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
		if err != nil {
			fmt.Printf("failed to start consumer for partition %d,err:%v", partition, err)
			return
		}
		defer pc.AsyncClose()
		//异步从每个分区消费消息
		go func(sarama.PartitionConsumer) {
			for msg := range pc.Messages() {
				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
			}
		}(pc)

	}
	select {}
}

在这里插入图片描述在这里插入图片描述
打开kafka终端消费者读取数据

bin\windows\kafka-console-consumer.bat --bootstrap-server=127.0.0.1:9092 --topic=web_log --from-beginning

在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐