传统ELK架构的日志收集:
在这里插入图片描述
存在的问题:Logstash耗资源较大,运行占用CPU和内存高。另外没有消息队列缓存,存在数据丢失隐患。适用于小规模的集群使用。

第二种架构:
在这里插入图片描述
位于各个节点上的Log Agent先将数据/日志传递给Kafka,并将队列中消息或数据交由Log Transfer,传递给Elasticsearch存储。最后由Kibana将日志和数据呈现给用户。因为引入了Kafka,数据会先被存储下来,所以即使Logstash server因故障停止运行,数据也不会丢失。这种架构适合于较大集群使用

各组件介绍:
LogAgent:日志收集客户端,用来收集服务器上的日志
Kafka:高吞吐量的分布式队列(Linkin开发,apache顶级开源项目),消息队列和日志存储。
ElasticSearch:开源的搜索引擎,提供介于HTTP RESTful的web接口
Kibana:开源的ES数据分析和可视化工具。
Hadoop:分布式计算框架,能够对大量数据进行分布式处理的平台。
Storm:一个免费并开源的分布式实时计算系统
引用链接

Kafka和tailf的参考链接

Zookeeper:ZooKeeper是一种分布式协调服务,用于管理大型主机。在分布式环境中协调和管理服务是一个复杂的过程。ZooKeeper通过其简单的架构和API解决了这个问题。
Zookeeper扮演红色角色
在这里插入图片描述

ElasticSearch:是一个分布式可扩展的实时搜索和分析引擎,一个建立在全文搜索引擎 Apache Lucene™ 基础上的搜索引擎.当然 Elasticsearch 并不仅仅是 Lucene 那么简单,它不仅包括了全文搜索功能,还可以进行以下工作:
分布式实时文件存储,并将每一个字段都编入索引,使其可以被搜索。
实时分析的分布式搜索引擎。
可以扩展到上百台服务器,处理PB级别的结构化或非结构化数据。

Kibana是一个开源的分析与可视化平台,设计出来用于和Elasticsearch一起使用的。你可以用kibana搜索、查看存放在Elasticsearch中的数据。Kibana与Elasticsearch的交互方式是各种不同的图表、表格、地图等,直观的展示数据,从而达到高级的数据分析与可视化的目的。
Elasticsearch、Logstash和Kibana这三个技术就是我们常说的ELK技术栈,可以说这三个技术的组合是大数据领域中一个很巧妙的设计。一种很典型的MVC思想,模型持久层,视图层和控制层。Logstash担任控制层的角色,负责搜集和过滤数据。Elasticsearch担任数据持久层的角色,负责储存数据。而我们这章的主题Kibana担任视图层角色,拥有各种维度的查询和分析,并使用图形化的界面展示存放在Elasticsearch中的数据。

etcd 是一个分布式键值对存储系统,由coreos 开发,内部采用 raft 协议作为一致性算法,用于可靠、快速地保存关键数据,并提供访问。通过分布式锁、leader选举和写屏障(write barriers),来实现可靠的分布式协作。etcd集群是为高可用、持久化数据存储和检索而准备。
etcd架构图:
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

源码:

logagent包

config.ini

[kafka]
address=127.0.0.1:9092
chan_max_size=100000

[etcd]
address=127.0.0.1:2379
timeout=5
collect_log_key=/log/%s/collect_config

[taillog]
filename=./my.log
timeout=5

config.go

package conf

type AppConf struct {
	KafkaConf `ini:"kafka"`
	EtcdConf  `ini:"etcd"`
}

type KafkaConf struct {
	Address     string `ini:"address"`
	ChanMaxSize int    `ini:"chan_max_size"`
}

type EtcdConf struct {
	Address string `ini:"address"`
	Key     string `ini:"collect_log_key"`
	Timeout int    `ini:"timeout"`
}

type TaillogConf struct {
	Filename string `ini:"filename"`
}

etcd.go

package etcd

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	"go.etcd.io/etcd/clientv3"
)

var (
	cli *clientv3.Client
)

//需要收集的日志的配置信息
type LogEntry struct {
	Path  string `json:"path"`
	Topic string `json:"topic"`
}

//初始化etcd的函数
func Init(addr string, timeout time.Duration) {
	var err error
	cli, err = clientv3.New(clientv3.Config{
		Endpoints:   []string{addr},
		DialTimeout: timeout,
	})
	if err != nil {
		fmt.Println("connect to etcd success")
		return
	}
	fmt.Println("connect to etcd success")
}

//从etcd中获取根据key配置项
func GetConf(key string) (LogEntryConf []*LogEntry, err error) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	resp, err := cli.Get(ctx, key)
	cancel()
	if err != nil {
		fmt.Println("put to etcd failed,err:", err)
		return
	}
	for _, ev := range resp.Kvs {
		err = json.Unmarshal(ev.Value, &LogEntryConf)
		if err != nil {
			fmt.Println("unmarshal etcd value failed,err:", err)
			return
		}
		fmt.Printf("value:%s\n", ev.Value)
	}
	return
}

//etcd_watch
func WatchConf(key string, newConfCh chan<- []*LogEntry) {
	ch := cli.Watch(context.Background(), key)
	//从通道尝试取值(监视的信息)
	for wresp := range ch {
		for _, evt := range wresp.Events {
			fmt.Printf("Type:%v key:%v value:%v\n", evt.Type, string(evt.Kv.Key), string(evt.Kv.Value))
			//通知taillog.tskMgr
			//1.先判断操作的类型
			var newConf []*LogEntry
			// if evt.Type != clientv3.EventTypeDelete{
			// 	//如果是删除操作,手动传递一个空的配置项
			// 	err := json.Unmarshal(evt.Kv.Value, &newConf)
			// 	if err != nil {
			// 		fmt.Println("unmarshal failed,err:", err)
			// 		continue
			// 	}
			err := json.Unmarshal(evt.Kv.Value, &newConf)
			if err != nil {
				fmt.Println("unmarshal failed,err:", err)
				continue
			}
			fmt.Println("get new conf:", newConf)
			newConfCh <- newConf
		}

	}
}

etcd_put.go

package main

import (
	"context"
	"fmt"
	"time"

	"go.etcd.io/etcd/clientv3"
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"127.0.0.1:2379"}, //节点
		DialTimeout: 5 * time.Second,            //超过5秒钟连不上超时
	})
	if err != nil {
		fmt.Println("connect to etcd failed:", err)
		return
	}
	fmt.Println("connect to etcd success")
	defer cli.Close()
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	value := `[{"path":"d:/tmp/nginx.log","topic":"web_log"},{"path":"d:/xxx/redis.log","topic":"redis_log"},{"path":"d:/xxx/mysql.log","topic":"mysql_log"}]`
	_, err = cli.Put(ctx, "/log/192.168.1.7/collect_config", value)
	cancel()
	if err != nil {
		fmt.Println("put to etcd failed,err:", err)
		return
	}
}

kafka.go

package kafka

//log Agent入口
import (
	"fmt"
	"time"

	"github.com/Shopify/sarama"
)

type logData struct {
	topic string
	data  string
}

var (
	client      sarama.SyncProducer //声明一个全局的连接kafka的生产者client
	logDataChan chan *logData
)

//初始化client
func Init(addrs []string, maxSize int) (err error) {
	config := sarama.NewConfig()
	//tailf包使用
	config.Producer.RequiredAcks = sarama.WaitForAll          //发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner //新选出一个partition
	config.Producer.Return.Successes = true                   //成功交付的消息将在success channel返回
	//连接kafka
	client, err = sarama.NewSyncProducer(addrs, config)
	if err != nil {
		fmt.Println("producer closed,err:", err)
		return
	}
	fmt.Println("连接kafka成功!")
	if err != nil {
		fmt.Println("send msg failed,err:", err)
		return
	}
	//初始化logDataChan
	logDataChan = make(chan *logData, maxSize)
	//开启后台的goroutine从通道中取数据发往kafka
	go SendToKarfka()
	return
}

//给外部暴露的一个函数,该函数只把日志数据发送到一个内部的channel中
func SendToChan(topic, data string) {
	msg := &logData{
		topic: topic,
		data:  data,
	}
	logDataChan <- msg
}

//真正往kafka发送日志的函数
func SendToKarfka() {
	for {
		select {
		case ld := <-logDataChan:
			//构造一个消息
			msg := &sarama.ProducerMessage{}
			msg.Topic = ld.topic
			msg.Value = sarama.StringEncoder(ld.data)
			//发送到kafka
			pid, offset, err := client.SendMessage(msg) //offset是写成功的文件的索引位置
			if err != nil {
				fmt.Println("send msg failed,err:", err)
				return
			}
			fmt.Printf("pid:%v offset:%v\n", pid, offset)
		default:
			time.Sleep(time.Millisecond * 50)
		}
	}
}

tail.go

package taillog

import (
	"context"
	"fmt"
	"test/log/kafka"

	"github.com/hpcloud/tail"
)

var (
	tailObj *tail.Tail
	LogChan chan string
)

//TailTask:一个日志收集的任务
type TailTask struct {
	path     string
	topic    string
	instance *tail.Tail
	//为了能够实现退出t.run()
	ctx        context.Context
	cancelFunc context.CancelFunc
}

func NewTailTask(path, topic string) (tailObj *TailTask) {
	ctx, cancel := context.WithCancel(context.Background())
	tailObj = &TailTask{
		path:       path,
		topic:      topic,
		ctx:        ctx,
		cancelFunc: cancel,
	}
	tailObj.init() //根据路径去打开对应的日志
	return
}

func (t *TailTask) init() {
	config := tail.Config{
		ReOpen:    true,                                 //重新打开
		Follow:    true,                                 //是否跟随
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件哪个地方开始读
		MustExist: false,                                //文件不存在不报错
		Poll:      true,
	}
	var err error
	t.instance, err = tail.TailFile(t.path, config)
	if err != nil {
		fmt.Println("tail file failed,err:", err)
	}
	go t.run() //直接去采集日志发送到kafka
}

func (t *TailTask) run() {
	for {
		select {
		case <-t.ctx.Done():
			fmt.Printf("tail tast:%v_%s finish...\n", t.path, t.topic)
			return
		case line := <-t.instance.Lines: //从tailObj的通道中一行一行的读取日志数据
			// kafka.SendToKarfka(t.topic, line.Text) //函数调函数
			//先把日志数据发到一个通道中
			kafka.SendToChan(t.topic, line.Text)
			//kafka那个包中有单独的goroutine去取日志数据发到kafka

		}
	}
}

tail_mgr.go

package taillog

import (
	"fmt"
	"test/log/etcd"
	"time"
)

var tskMgr *taillogMgr

//tailTask 管理者
type taillogMgr struct {
	logEntry    []*etcd.LogEntry
	tskMap      map[string]*TailTask
	newConfChan chan []*etcd.LogEntry
}

func Init(logEntryConf []*etcd.LogEntry) {
	tskMgr = &taillogMgr{
		logEntry:    logEntryConf, //把当前的日志收集配置信息保存起来
		tskMap:      make(map[string]*TailTask, 16),
		newConfChan: make(chan []*etcd.LogEntry), //无缓冲区的通道
	}
	for _, LogEntry := range logEntryConf {
		//conf:*etcd.LogEntry
		//logEntry.Path:要收集的日志文件的路径
		//初始化的时候起了多少个tailtask都要记下来,为了后续判断方便
		tailObj := NewTailTask(LogEntry.Path, LogEntry.Topic)
		mk := fmt.Sprintf("%s_%s", LogEntry.Path, LogEntry.Topic)
		tskMgr.tskMap[mk] = tailObj
	}
	go tskMgr.run()
}

//监听自己的newConfChan,有了新的配置过来之后就做对应的处理
func (t *taillogMgr) run() {
	for {
		select {
		case newConf := <-t.newConfChan:
			fmt.Println("新的配置来了!", newConf)
			for _, conf := range newConf {
				mk := fmt.Sprintf("%s_%s", conf.Path, conf.Topic)
				_, ok := t.tskMap[mk]
				if ok {
					//原来就有,不需要操作
					continue
				} else {
					//新增的
					tailObj := NewTailTask(conf.Path, conf.Topic)
					t.tskMap[mk] = tailObj
				}
			}
			//找出原来t.tskMap有,但是newConf中没有的,要删除
			for _, c1 := range t.logEntry { //从原来的配置中依次拿出配置项
				isDelete := true
				for _, c2 := range newConf { //去新的配置中逐一进行比较
					if c2.Path == c1.Path && c2.Topic == c1.Topic {
						isDelete = false
						continue
					}
				}
				if isDelete {
					//把c1对应的tailObj给停掉
					mk := fmt.Sprintf("%s_%s", c1.Path, c1.Topic)
					t.tskMap[mk].cancelFunc()
				}
			}
		//1.配置新增
		//2.配置删除
		//3.配置变更
		default:
			time.Sleep(time.Second)
		}
	}
}

//一个函数,向外暴露tskMgr的newConfChan
func NewConfChan() chan<- []*etcd.LogEntry {
	return tskMgr.newConfChan
}

ip.go

package utils

import (
	"net"
	"strings"
)

//GetOutboundIP 获取本地对外IP

func GetOutboundIP() (ip string, err error) {
	conn, err := net.Dial("udp", "8.8.8.8:80")
	if err != nil {
		return
	}
	defer conn.Close()
	localAddr := conn.LocalAddr().(*net.UDPAddr)
	ip = strings.Split(localAddr.IP.String(), ":")[0]
	return
}

main.go

package main

import (
	"fmt"
	"sync"
	"test/log/conf"
	"test/log/etcd"
	"test/log/kafka"
	"test/log/taillog"
	"test/log/utils"
	"time"

	"gopkg.in/ini.v1"
)

var (
	cfg = new(conf.AppConf)
)

func main() {
	//0.加载配置文件
	err := ini.MapTo(cfg, "./conf/config.ini")
	if err != nil {
		println("load ini failed,err:", err)
		return
	}
	//1.初始化一个kafka连接
	err = kafka.Init([]string{cfg.KafkaConf.Address}, cfg.KafkaConf.ChanMaxSize)
	if err != nil {
		fmt.Println("init kafka failed,err:", err)
		return
	}
	fmt.Println("初始化成功!")
	//2.初始化etcd
	etcd.Init(cfg.EtcdConf.Address, time.Duration(cfg.EtcdConf.Timeout)*time.Second)
	if err != nil {
		fmt.Println("init etcd failed,err:", err)
		return
	}
	//为了实现每个logagent都拉取自己独有的配置,所以要以自己的IP地址作为区分
	ipStr, err := utils.GetOutboundIP()
	if err != nil {
		panic(err)
	}
	etcdConfKey := fmt.Sprintf(cfg.EtcdConf.Key, ipStr)
	fmt.Printf("etcdConfKey:%s\n", etcdConfKey)
	//2.1 从etcd中获取日志收集项的配置信息
	logEntryConf, err := etcd.GetConf(etcdConfKey)
	if err != nil {
		fmt.Println("etcd.GetConf failed,err:", err)
		return
	}
	fmt.Println("get conf from etcd success:", logEntryConf)
	//2.2 派一个哨兵去监视日志收集项的变化(有变化及时通知我的logAgent实现加载配置)

	for index, value := range logEntryConf {
		fmt.Printf("index:%v value:%v\n", index, value)
	}
	fmt.Println("init etcd success.")

	//3.收集日志发往Kafka
	//3.1 循环每一个日志收集项,创建一个TailObj
	taillog.Init(logEntryConf)
	//因为NewConfChan访问了tskMgr的newConfChan,这个channel是在taillog.Init(logEntryConf)执行的初始化
	newConfChan := taillog.NewConfChan() //从taillog包中获取对外暴露的通道
	var wg sync.WaitGroup
	wg.Add(1)
	go etcd.WatchConf(etcdConfKey, newConfChan) //哨兵发现最新的配置信息会通知上面的那个通道
	wg.Wait()
	//3.2发往Kafka

	//4.打开日志文件准备收集日志
}

log_transfer包

cfg.ini

[kafka]
address=127.0.0.1:9092
topic=web_log

[es]
address=127.0.0.1:9200
size=100000

cfg.go

package conf

//LogTransfer 全局配置
type Logtransfer struct {
	KafkaCfg `ini:"kafka"`
	ESCfg    `ini:"es"`
}

//Kafka...
type KafkaCfg struct {
	Address string `ini:"address"`
	Topic   string `ini:"topic"`
}

//ESCfg
type ESCfg struct {
	Address  string `ini:"address"`
	ChanSize int    `ini:"size"`
}

es.go

package es

import (
	"context"
	"fmt"
	"strings"
	"time"

	"github.com/olivere/elastic/v7"
)

//初始化ES,准备接收kafka那边发来的数据

type LogData struct {
	Topic string `json:"topic"`
	Data  string `json:"data"`
}

var (
	client *elastic.Client
	ch     chan *LogData
)

//init...
func Init(address string, chanSize int) (err error) {
	if !strings.HasPrefix(address, "http://") {
		address = "http://" + address
	}
	client, err = elastic.NewClient(elastic.SetURL(address))
	if err != nil {
		return
	}
	fmt.Println("connect to es success")
	ch = make(chan *LogData, chanSize)
	go SendToES()
	return
}

// func SendToESChan(d *LogData) (err error) {
// 	msg := &LogData{}
// 	msg.Topic = d.Topic
// 	msg.Data = string(d.Data)

// 	_, err = client.Index().
// 		Index(d.Topic).
// 		BodyJson(msg).
// 		Do(context.Background())
// 	if err != nil {
// 		panic(err)
// 	}
// 	return
// }

func SendToESChan(msg *LogData) {
	ch <- msg
}

//发送数据到ES
func SendToES() {
	//链式操作
	for {
		select {
		case msg := <-ch:
			put1, err := client.Index().
				Index(msg.Topic). //Index表数据库
				Type("xxx").
				BodyJson(msg). //把一个go语言的对象转换为json格式
				Do(context.Background())
			if err != nil {
				fmt.Println(err)
			}
			fmt.Printf("Indexed %s to index %s,type %s\n", put1.Id, put1.Index, put1.Type)
		default:
			time.Sleep(time.Second)
		}
	}
}

kafka.go

package kafka

import (
	"fmt"
	"test/log_transfer/es"

	"github.com/Shopify/sarama"
)

//LogData...
type LogData struct {
	Data string `json:"data"`
}

//初始化kafka消费者,从kafka取数据发往ES
func Init(addr []string, topic string) (err error) {
	consumer, err := sarama.NewConsumer(addr, nil)
	if err != nil {
		fmt.Printf("fail to start consumer,err:%v\n", err)
		return
	}
	partitionList, err := consumer.Partitions(topic) //根据topic取到所有的分区
	if err != nil {
		fmt.Println("fail to get list of partition:", err)
		return
	}
	var pc sarama.PartitionConsumer
	fmt.Println(partitionList)
	for partition := range partitionList {
		pc, err = consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
		if err != nil {
			fmt.Printf("failed to start consumer for partition %d,err:%v", partition, err)
			return
		}
		defer pc.AsyncClose()
		//异步从每个分区消费消息
		go func(sarama.PartitionConsumer) {
			for msg := range pc.Messages() {
				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
				//直接发给ES
				var ld = es.LogData{
					Topic: topic,
					Data:  string(msg.Value),
				}
				es.SendToESChan(&ld) //函数调函数
				//优化一下,直接放到chann中
			}
		}(pc)
		select {}
	}
	return
}

main.go

package main

import (
	"fmt"
	"test/log_transfer/conf"
	"test/log_transfer/es"
	"test/log_transfer/kafka"

	"gopkg.in/ini.v1"
)

//log transfer
//将日志数据从kafka取出来发往ES

func main() {
	//0.加载配置文件
	var cfg conf.Logtransfer
	err := ini.MapTo(&cfg, "./conf/cfg.ini")
	if err != nil {
		fmt.Println("init config err:", err)
		return
	}
	fmt.Printf("cfg:%v\n", cfg)
	//1.初始化ES
	//1.1 初始化一个ES连接的client
	//1.2 对外提供y一个往ES写入数据的一个函数
	err = es.Init(cfg.ESCfg.Address, cfg.ESCfg.ChanSize)
	if err != nil {
		fmt.Println("init ES consumer failed,err:", err)
		return
	}
	fmt.Println("init es success.")
	//2.初始化Kafka
	//2.1 连接kafka,创建分区的消费者
	//2.2 每个分区的消费者分别取出数据,通过sendToES()将数据发往ES
	err = kafka.Init([]string{cfg.KafkaCfg.Address}, cfg.KafkaCfg.Topic)
	if err != nil {
		fmt.Println("init kafka consumer failed,err:", err)
		return
	}
	//3.从kafka中取数据

	//4.发往ES
	select {}
}

自己编写版):

config包:

config.ini

[kafka]
address=127.0.0.1:9092
chan_max_size=100000

[etcd]
address=127.0.0.1:2379
timeout=5
log_key=/log/collect_config

[es]
address=127.0.0.1:9200
size=100000

config.go

package config

import (
	"context"

	"github.com/hpcloud/tail"
)

type AppConf struct {
	KafkaConf `ini:"kafka"`
	EtcdConf  `ini:"etcd"`
	ESConf    `ini:"es"`
}

type KafkaConf struct {
	Address  string `ini:"address"`
	Max_size int    `ini:"chan_max_size"`
}

type EtcdConf struct {
	Address string `ini:"address"`
	Timeout int    `ini:"timeout"`
	Log_key string `ini:"log_key"`
}

type ESConf struct {
	Address  string `ini:"address"`
	Max_size int    `ini:"size"`
}

type LogConf struct {
	Path  string `ini:"path"`
	Topic string `ini:"topic"`
}

type LogEntryConf []*LogConf

type TailTask struct {
	Path     string
	Topic    string
	Instance *tail.Tail
	Ctx      context.Context
	CancelF  context.CancelFunc
}

type LogData struct {
	Topic string
	Data  string
}

es包

es.go

package es

import (
	"context"
	"fmt"
	"test/mylog/config"
	"time"

	"github.com/olivere/elastic/v7"
)

var (
	client *elastic.Client
	ESchan chan *config.LogData
)

func Init(address string, size int) (err error) {
	client, err = elastic.NewClient(elastic.SetURL(address))
	if err != nil {
		fmt.Println("Init ES failed,err:", err)
		return
	}
	ESchan = make(chan *config.LogData, 1000)
	go SendToES()
	return
}

func SendToESChan(msg *config.LogData) {
	ESchan <- msg
	fmt.Println("sssss")
}

func SendToES() {
	for {
		select {
		case msg := <-ESchan:
			put1, err := client.Index().Index(msg.Topic).BodyJson(msg).Do(context.Background())
			if err != nil {
				panic(err)
			}
			fmt.Printf("Index user:%s to index %s,type:%s\n", put1.Id, put1.Index, put1.Type)
		default:
			time.Sleep(time.Second)
		}
	}
}

etcd包

etcd.go

package etcd

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	"test/mylog/config"
	"test/mylog/tail"

	"go.etcd.io/etcd/clientv3"
)

var (
	client  *clientv3.Client
	logdata config.LogEntryConf
)

func Init(address []string, timeout int) (err error) {
	client, err = clientv3.New(clientv3.Config{
		Endpoints:   address,
		DialTimeout: time.Duration(timeout) * time.Second,
	})
	if err != nil {
		fmt.Println("connect to etcd failed,err:\n", err)
		return
	}
	fmt.Println("connect to etcd success!")
	return
}

func GetConf(key string) (logconf config.LogEntryConf, err error) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	var resp *clientv3.GetResponse
	resp, err = client.Get(ctx, key)
	cancel()
	if err != nil {
		fmt.Println("get from etcd failed,err:", err)
		return
	}
	for _, ev := range resp.Kvs {
		err = json.Unmarshal(ev.Value, &logconf)
		if err != nil {
			fmt.Println("Unmarshal json failed:", err)
			return
		}
	}
	return
}

func WatchConf(topic string) {
	rch := client.Watch(context.Background(), topic)
	channel := tail.Get_chan()
	for wresp := range rch {
		for _, ev := range wresp.Events {
			err := json.Unmarshal(ev.Kv.Value, &logdata)
			if err != nil {
				fmt.Println("Update conf failed,err:", err)
				return
			}
			fmt.Println("update config success:", ev.Kv.Value)
			channel <- logdata
		}
	}
}

kafka包

kafka.go

package kafka

import (
	"fmt"
	"test/mylog/config"
	"test/mylog/es"

	"github.com/Shopify/sarama"
)

var (
	client      sarama.SyncProducer
	logDataChan chan *config.LogData
	consumer    sarama.Consumer
	pc          sarama.PartitionConsumer
)

func Init(address []string, max_size int) (err error) {
	cfg := sarama.NewConfig()
	cfg.Producer.RequiredAcks = sarama.WaitForAll
	cfg.Producer.Partitioner = sarama.NewRandomPartitioner
	cfg.Producer.Return.Successes = true

	client, err = sarama.NewSyncProducer(address, cfg)
	if err != nil {
		fmt.Println("Produce error:", err)
		return
	}

	logDataChan = make(chan *config.LogData, max_size)

	consumer, err = sarama.NewConsumer(address, nil)
	if err != nil {
		fmt.Println("Init consumer failed,err:", err)
		return
	}

	go SendMessage()

	return
}

func SendToChan(topic, data string) {
	var t = &config.LogData{
		Topic: topic,
		Data:  data,
	}
	logDataChan <- t
}

func SendMessage() {
	for {
		select {
		case ld := <-logDataChan:
			msg := sarama.ProducerMessage{}
			msg.Topic = ld.Topic
			msg.Value = sarama.StringEncoder(ld.Data)
			pid, offset, err := client.SendMessage(&msg)
			if err != nil {
				fmt.Println("Send Message error:", err)
			}
			fmt.Printf("pid:%v offser:%v Topic:%v Value:%v\n", pid, offset, ld.Topic, ld.Data)
		default:
		}
	}
}

func Consumer(topic string) {
	partitionList, err := consumer.Partitions(topic)
	if err != nil {
		fmt.Println("Get partitions failed,err:", err)
		return
	}
	for partition := range partitionList {
		pc, err = consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
		if err != nil {
			fmt.Println("failed to start consumer for partition,err:", err)
			return
		}
	}
	defer pc.AsyncClose()
	go func(sarama.PartitionConsumer) {
		for msg := range pc.Messages() {
			fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
			t := &config.LogData{
				Topic: topic,
				Data:  string(msg.Value),
			}
			es.SendToESChan(t)
		}
	}(pc)
	select {}
}

// func Consumer(topic string) (err error) {
// 	partitionList, err := consumer.Partitions(topic) //根据topic取到所有的分区
// 	if err != nil {
// 		fmt.Println("fail to get list of partition:", err)
// 		return
// 	}
// 	var pc sarama.PartitionConsumer
// 	fmt.Println(partitionList)
// 	for partition := range partitionList {
// 		pc, err = consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
// 		if err != nil {
// 			fmt.Printf("failed to start consumer for partition %d,err:%v", partition, err)
// 			return
// 		}
// 		defer pc.AsyncClose()
// 		//异步从每个分区消费消息
// 		go func(sarama.PartitionConsumer) {
// 			for msg := range pc.Messages() {
// 				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
// 				//直接发给ES
// 				var ld = config.LogData{
// 					Topic: topic,
// 					Data:  string(msg.Value),
// 				}
// 				es.SendToESChan(&ld) //函数调函数
// 				//优化一下,直接放到chann中
// 			}
// 		}(pc)
// 		select {}
// 	}
// 	return
// }

tail包

tail.go

package tail

import (
	"context"
	"fmt"
	"test/mylog/config"
	"test/mylog/kafka"

	"github.com/hpcloud/tail"
)

type Tasks config.TailTask

var (
	tails      *tail.Tail
	tasks_map  map[string]*config.TailTask
	tasks_chan chan config.LogEntryConf
)

func run(T *config.TailTask) {
	for {
		select {
		case <-T.Ctx.Done():
			return
		case line := <-T.Instance.Lines:
			kafka.SendToChan(T.Topic, line.Text)
		}
	}
}

func Init(Tvalue config.LogEntryConf) error {
	tasks_map = make(map[string]*config.TailTask, 100)
	tasks_chan = make(chan config.LogEntryConf)
	for _, value := range Tvalue {
		base := config.LogConf{
			Path:  value.Path,
			Topic: value.Topic,
		}
		Task, err := NewTask(base)
		name := fmt.Sprintf("%s\\%s", value.Path, value.Topic)
		tasks_map[name] = Task
		if err != nil {
			fmt.Println("Init tail failed,err:", err)
			return err
		}
		go run(Task)
	}
	go Update_Task()
	return nil
}

func Update_Task() {
	for {
		select {
		case new_tasks := <-tasks_chan:
			for _, old_task := range tasks_map {
				name := fmt.Sprintf("%s\\%s", old_task.Path, old_task.Topic)
				tasks_map[name].CancelF()
			}
			for _, new_task := range new_tasks {
				name := fmt.Sprintf("%s\\%s", new_task.Path, new_task.Topic)
				Task, err := NewTask(*new_task)
				if err != nil {
					fmt.Println("init task err:", err)
					return
				}
				tasks_map[name] = Task
			}
		}
	}
}

func Get_chan() chan config.LogEntryConf {
	return tasks_chan
}

func NewTask(base config.LogConf) (tal *config.TailTask, err error) {
	cfg := tail.Config{
		ReOpen:    true,
		Follow:    true,
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
		MustExist: false,
		Poll:      true,
	}
	ctx, cancel := context.WithCancel(context.Background())
	tails, err = tail.TailFile(base.Path, cfg)
	if err != nil {
		fmt.Println("tail file failed,err:", err)
	}
	tal = &config.TailTask{
		Path:     base.Path,
		Topic:    base.Topic,
		Instance: tails,
		Ctx:      ctx,
		CancelF:  cancel,
	}
	return
}

main.go

package main

import (
	"fmt"
	"sync"
	"test/mylog/config"
	"test/mylog/es"
	"test/mylog/etcd"
	"test/mylog/tail"

	"test/mylog/kafka"

	"gopkg.in/ini.v1"
)

var wg sync.WaitGroup

func main() {
	var cfg config.AppConf
	err := ini.MapTo(&cfg, "./config/config.ini")
	if err != nil {
		fmt.Println("Decode Map failed!", err)
	}
	err = kafka.Init([]string{cfg.KafkaConf.Address}, cfg.KafkaConf.Max_size)
	if err != nil {
		fmt.Println("init kafka failed", err)
		return
	}
	fmt.Println("init kafka success!")

	err = etcd.Init([]string{cfg.EtcdConf.Address}, cfg.EtcdConf.Timeout)
	var path config.LogEntryConf
	path, err = etcd.GetConf(cfg.Log_key)
	if err != nil {
		return
	}
	tail.Init(path)
	es.Init(cfg.ESConf.Address, cfg.ESConf.Max_size)
	for index, value := range path {
		fmt.Printf("index:%v value:%v topic:%v\n", index, value, value.Topic)
		kafka.Consumer(value.Topic)
	}
	wg.Add(1)
	etcd.WatchConf(cfg.EtcdConf.Log_key)
	wg.Done()
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐