Kafka集群间同步数据方案-Flume
Apache Flume 是一个分布式、高可靠、高可用的用来收集、聚合、转移不同来源的大量日志数据到中央数据仓库的工具。
Apache Flume 是一个分布式、高可靠、高可用的用来收集、聚合、转移不同来源的大量日志数据到中央数据仓库的工具。
系统要求
-
Java运行环境 - Java 1.8或更高版本
体系结构
Event是Flume定义的一个数据流传输的最小单元。Agent就是一个Flume的实例,本质是一个JVM进程,该JVM进程控制Event数据流从外部日志生产者那里传输到目的地(或者是下一个Agent)。
学习Flume必须明白这几个概念,Event英文直译是事件,但是在Flume里表示数据传输的一个最小单位(被Flume收集的一条条日志又或者一个个的二进制文件,不管你在外面叫什么,进入Flume之后它就叫event)。参照下图可以看得出Agent就是Flume的一个部署实例, 一个完整的Agent中包含了必须的三个组件Source、Channel和Sink,Source是指数据的来源和方式,Channel是一个数据的缓冲池,Sink定义了数据输出的方式和目的地。
Source消耗由外部(如Web服务器)传递给它的Event。外部以Flume Source识别的格式向Flume发送Event。例如,Avro Source 可接收从Avro客户端(或其他FlumeSink)接收Avro Event。用 Thrift Source 也可以实现类似的流程,接收的Event数据可以是任何语言编写的只要符合Thrift协议即可。
kafka集群间同步方案
flume同步kafka的本质是flume起一个java进程,监控并消费待同步的kafka集群的topic,并写入到需要同步到的kafka集群的kafka,所以在配置时,Source为Kafka Source,Sink为Kfka Sink。
Kafka Source就是一个Apache Kafka消费者,它从Kafka的topic中读取消息。 如果运行了多个Kafka Source,则可以把它们配置到同一个消费者组,以便每个source都读取一组唯一的topic分区。目前支持Kafka 0.10.1.0以上版本。Kafka Sink 可以把数据发送到 Kafka topic上。
同步案例
1.源集群和目标集群(从源集群同步数据到目标集群)
集群 | 地址 | 版本 |
源集群 | 10.19.162.107(其中一个节点) | 2.13-3.0.2 |
目标集群 | 110.19.162.101(其中一个节点) | 2.13-2.7.0 |
在10.19.162.107和110.19.162.101上分别新建测试topic:test-flume-logs1
# 新建topic ./bin/kafka-topics.sh --create --bootstrap-server 10.19.162.107:9092 --replication-factor 2 --partitions 1 --topic test-flume-logs1 |
这里使用python脚本往10.19.162.107的topic:test-flume-logs1写入100000条测点数据,python脚本如下:
import os from pykafka import KafkaClient dirPath = 'E:\\log\\测点数据' fileList = os.listdir(dirPath) client = KafkaClient(hosts="10.19.162.107:9092,10.19.162.108:9092,10.19.162.109:9092") topic = client.topics['test-flume-logs1'] print(len(fileList)) f2 = open('E:\\log\\all.txt',mode="w",encoding='utf-8') for j in range(10): with topic.get_sync_producer() as producer: for i in range(len(fileList)): fileName = dirPath + "\\" + fileList[i] f = open(fileName,encoding='utf-8') line = f.readline() # 以下是文件读取伪代码 while line: line = line + '\n' f2.write(line) # 写入kafka producer.produce(line.encode()) # print(line) line = f.readline() f.close() f2.close() |
写入后查看10.19.162.107的test-flume-logs1中消息的数量:
在如下位置新建flume同步的配置文件kafka-to-kafka.conf
配置文件内容如下:
#kafka相关source和sink参数可以参考如下网址 # 中文翻译版:https://flume.liyifeng.org/#kafka-sink # 英文原版: https://flume.apache.org/releases/content/1.10.1/FlumeUserGuide.html#kafka-sink # Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a2.sources.r1.batchSize = 1000 a2.sources.r1.batchDurationMillis = 2000 # 此处hadoop01,hadoop02,hadoop03在host文件中已配置,ip分别为10.19.162.107,10.19.162.108,10.19.162.109 a2.sources.r1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092 a2.sources.r1.kafka.topics = test-flume-logs1 a2.sources.r1.kafka.consumer.group.id = testgruoup a2.sources.r1.migrateZookeeperOffsets = false a2.sources.r1.kafka.consumer.auto.offset.reset = earliest ## Source 拦截器,如果不指定拦截器,flume有bug,sink会使用最上方配置的kafka配置 a2.sources.r1.interceptors = i1 a2.sources.r1.interceptors.i1.type = static a2.sources.r1.interceptors.i1.key = topic a2.sources.r1.interceptors.i1.preserveExisting = false a2.sources.r1.interceptors.i1.value = test-flume-logs1 # Describe the sink a2.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a2.sinks.k1.kafka.topic = test-flume-logs1 a2.sinks.k1.kafka.bootstrap.servers = 10.19.162.101:9092,10.19.162.102:9092,10.19.162.105:9092 a2.sinks.k1.kafka.flumeBatchSize = 20 a2.sinks.k1.kafka.producer.acks = 1 a2.sinks.k1.kafka.producer.linger.ms = 1 a2.sinks.ki.kafka.producer.compression.type = snappy # Use a channel which buffers events in memory a2.channels.c1.type = memory a2.channels.c1.capacity = 2000 a2.channels.c1.transactionCapacity = 1000 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1 |
写入前,查看10.19.162.101的test-flume-logs1中消息的数量:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.19.162.107:9092 --topic test-flume-logs1 --time -1 |
使用如下指令开启flume:
# 其中9001位flume的状态监控端口 bin/flume-ng agent --conf conf --conf-file conf/kafka-to-kafka.conf --name a2 -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=9001 |
执行完之后查看10.19.162.101的test-flume-logs1中消息的数量,可以看到已经全部导入:
访问http://10.19.162.107:9001/metrics可以查看flume运行状态:
格式化后如下:
{ "SINK.k1": { "ConnectionCreatedCount": "0", "BatchCompleteCount": "0", "BatchEmptyCount": "26", "EventDrainAttemptCount": "100000", # event数量,也就是从源kafka的topic中读取的数量 "StartTime": "1666580989920", "BatchUnderflowCount": "0", "ConnectionFailedCount": "0", "ConnectionClosedCount": "0", "Type": "SINK", "RollbackCount": "0", "EventDrainSuccessCount": "100000", # 成功写入的数量 "KafkaEventSendTimer": "9861", "StopTime": "0" }, "CHANNEL.c1": { "ChannelCapacity": "2000", "ChannelFillPercentage": "0.0", "Type": "CHANNEL", "ChannelSize": "0", "EventTakeSuccessCount": "100000", "EventTakeAttemptCount": "100027", "StartTime": "1666580989317", "EventPutAttemptCount": "100000", "EventPutSuccessCount": "100000", "StopTime": "0" }, "SOURCE.r1": { "KafkaEventGetTimer": "4470", "AppendBatchAcceptedCount": "0", "EventAcceptedCount": "100000", "AppendReceivedCount": "0", "StartTime": "1666580989944", "AppendBatchReceivedCount": "0", "KafkaCommitTimer": "257", "EventReceivedCount": "100000", "Type": "SOURCE", "AppendAcceptedCount": "0", "OpenConnectionCount": "0", "KafkaEmptyCount": "30", "StopTime": "0" } } |
指标项说明
source监控项
objectName (会随实际情况而变化) | 指标项 | 说明 |
---|---|---|
org.apache.flume.source:type=r1 | OpenConnectionCount | 目前与客户端或sink保持连接的总数量 |
org.apache.flume.source:type=r1 | AppendBatchAcceptedCount | 成功提交到channel的批次的总数量 |
org.apache.flume.source:type=r1 | AppendBatchReceivedCount | 接收到事件批次的总数量 |
org.apache.flume.source:type=r1 | AppendAcceptedCount | 逐条录入的次数 |
org.apache.flume.source:type=r1 | AppendReceivedCount | 每批只有一个事件的事件总数量 |
org.apache.flume.source:type=r1 | EventAcceptedCount | 成功写出到channel的事件总数量 |
org.apache.flume.source:type=r1 | EventReceivedCount | 目前为止source已经接收到的事件总数量 |
org.apache.flume.source:type=r1 | StartTime | source启动时的毫秒值时间 |
org.apache.flume.source:type=r1 | StopTime | source停止时的毫秒值时间,为0表示一直在运行 |
channel监控项
objectName (会随实际情况而变化) | 指标项 | 说明 |
---|---|---|
org.apache.flume.channel:type=c1 | EventPutAttemptCount | Source尝试写入Channe的事件总次数 |
org.apache.flume.channel:type=c1 | EventPutSuccessCount | 成功写入channel且提交的事件总次数 |
org.apache.flume.channel:type=c1 | EventTakeAttemptCount | sink尝试从channel拉取事件的总次数。 |
org.apache.flume.channel:type=c1 | EventTakeSuccessCount | sink成功从channel读取事件的总数量 |
org.apache.flume.channel:type=c1 | ChannelSize | 目前channel中事件的总数量 |
org.apache.flume.channel:type=c1 | ChannelCapacity | channel的容量 |
org.apache.flume.channel:type=c1 | ChannelFillPercentage | channel已填入的百分比 |
org.apache.flume.channel:type=c1 | StartTime | channel启动时的毫秒值时间 |
org.apache.flume.channel:type=c1 | StopTime | channel停止时的毫秒值时间,为0表示一直在运行 |
sink监控项
objectName (会随实际情况而变化) | 指标项 | 说明 |
---|---|---|
org.apache.flume.sink:type=k1 | ConnectionCreatedCount | 创建的连接数量 |
org.apache.flume.sink:type=k1 | ConnectionClosedCount | 关闭的连接数量 |
org.apache.flume.sink:type=k1 | ConnectionFailedCount | 由于错误关闭的连接数量 |
org.apache.flume.sink:type=k1 | BatchEmptyCount | 批量处理event的个数为0的数量-表示source写入数据的速度比sink处理数据的速度慢 |
org.apache.flume.sink:type=k1 | BatchUnderflowCount | 批量处理event的个数小于批处理大小的数量 |
org.apache.flume.sink:type=k1 | BatchCompleteCount | 批量处理event的个数等于批处理大小的数量 |
org.apache.flume.sink:type=k1 | EventDrainAttemptCount | sink尝试写出到存储的事件总数量 |
org.apache.flume.sink:type=k1 | EventDrainSuccessCount | sink成功写出到存储的事件总数量 |
org.apache.flume.sink:type=k1 | StartTime | channel启动时的毫秒值时间 |
org.apache.flume.sink:type=k1 | StopTime | channel停止时的毫秒值时间,为0表示一直在运行 |
更多flume监控可以参考:
更多推荐
所有评论(0)