SpringBoot+KafkaConnect功能+监控实践(持续更新)
手把手实现kafka connect安装启动,实现连接mysql查询数据,插入数据,并实现通过spring boot项目操作connect和获取统计信息
·
版权说明: 本文由博主keep丶原创,转载请注明出处。
原文地址: https://blog.csdn.net/qq_38688267/article/details/120729471
文章目录
前言
实践
环境搭建
作者是在linux环境中搭建的,建议在linux环境测试。
kafka安装运行
-
下载
传送门 -
解压
# 解压
tar -xzf kafka_2.13-3.0.0.tgz
# 进入主目录
cd kafka_2.13-3.0.0
# 启动zk
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动kafka
bin/kafka-server-start.sh config/server.properties
启动kafka connect
# 第一个properties是启动connect的配置;
# 第二个properties是connector的配置,会根据配置创建一个connector
bin/connect-standalone.sh \
config/connect-standalone.properties \
connect-file-sink.properties
功能实现
实现mysql数据读取和插入
kafka官方包里面只支持几种简单的connector类型,像JDBC这些是不支持的,因此需要额外添加。
- 直接linux下载
# 需要安装wget工具,如果没有
yum install -y wget
# 下载
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.2.4/confluentinc-kafka-connect-jdbc-10.2.4.zip
- 安装
# 先解压
tar -xzf confluentinc-kafka-connect-jdbc-10.2.3.zip
# 将该工具里的lib下面的包,复制到kafka/lib下即可
cp confluentinc-kafka-connect-jdbc-10.2.3/lib/* KAFKA_HOME/lib
# 再重启kafka和connect
-
windows下载JDBC连接包,再上传到Linux
下载地址 -
添加一个MySQL source connector;
source
相当于消息生产者,从指定对象中(这里的MySQL的test_zzf表中的数据)读取到指定主题中。
curl -X POST -H "Content-Type: application/json" \
-d '{
"name":"test_mysql_source",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"mode":"bulk",
"tables":"`zzf`.`test_zzf`",
"task.class":"io.confluent.connect.jdbc.source.JdbcSourceTask",
"tasks.max":"2",
"topics":"mysql_test",
"name":"test_mysql",
"connection.url":"jdbc:mysql://localhost:3306/zzf?user=root&password=123456",
"table.whitelist":"test_zzf"
}
}'
- 添加一个MySQL sink connector,
sink
相当于消费者,即会将指定主题中的数据消费到指定对象中(这里是mysql的test_zzf1表中)
curl -X POST -H "Content-Type: application/json" \
-d '{
"name":"test_mysql_sink",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"mode":"bulk",
"tables":"`zzf`.`test_zzf1`",
"task.class":"io.confluent.connect.jdbc.source.JdbcSinkTask",
"tasks.max":"2",
"topics":"mysql_test",
"name":"test_mysql",
"connection.url":"jdbc:mysql://localhost:3306/zzf?user=root&password=123456",
"table.whitelist":"test_zzf"
}
}'
开启JMX监控
-
请参考Kafka开启JMX监控
-
需要注意的是,修改
kafka_run_class.sh
,在第一行增加JMX_PORT=9527
。在完成kafka connect
启动后,要将该行注释!否则kafka运行其他功能时会报错! -
关于JMX中的指标,在官网中有介绍,传送门
spring boot 项目集成
集成Kafka Connector
- 添加官方依赖
<dependency>
<groupId>org.sourcelab</groupId>
<artifactId>kafka-connect-client</artifactId>
<version>3.1.1</version>
</dependency>
- 配置类
@Configuration
@ConditionalOnProperty(value = "kafka.connect.enabled", havingValue = "true")
public class KafkaConnectorClientConfig {
@Value("${kafka.connect.host}")
private String connectHost;
/**
* 初始化kafka connector连接客户端
*/
@Bean
public ConnectorClient connectorClient(){
org.sourcelab.kafka.connect.apiclient.Configuration configuration = new org.sourcelab.kafka.connect.apiclient.Configuration(connectHost);
HttpClientRestClient httpClientRestClient = new HttpClientRestClient();
httpClientRestClient.init(configuration);
return new ConnectorClient(httpClientRestClient);
}
- 客户端封装类
@AllArgsConstructor
public class ConnectorClient {
@Getter
private final HttpClientRestClient client;
/*
kafka connect 官方依赖中并没有很好的封装出返回值对象,而是通过统一的ResonseStr字符串返回。
因此为了方便业务使用,我们可以自己创建对应的返回值对象,封装用到的方法。
*/
/**
* 查询connector状态
* @param connectorName 连接器名
*/
public ConnectorStatusRep connectorStatus(String connectorName){
RestResponse response = client.submitRequest(new GetConnectorStatus(connectorName));
if(fail(response)) {
throw new RuntimeException("请求kafka connector失败!返回信息:" + response.getResponseStr());
}
return JSON.parseObject(response.getResponseStr(), ConnectorStatusRep.class);
}
private boolean success(RestResponse response) {
return HttpStatus.OK.value() == response.getHttpCode();
}
private boolean fail(RestResponse response) {
return !success(response);
}
}
集成kafka Monitor
- 配置类
@Configuration
@ConditionalOnProperty(value = "kafka.jmx.enabled", havingValue = "true")
public class KafkaMonitorConfig {
@Value("${kafka.jmx.host}")
private String jmxHost;
@Bean
KafkaMonitorClient kafkaMonitorClient(){
return new KafkaMonitorClient(jmxHost);
}
}
- 客户端封装类
public class KafkaMonitorClient {
private MBeanServerConnection conn;
public KafkaMonitorClient(String jmxHost) {
String jmxURL = "service:jmx:rmi:///jndi/rmi://" + jmxHost + "/jmxrmi";
JMXServiceURL serviceURL;
try {
serviceURL = new JMXServiceURL(jmxURL);
JMXConnector connector = JMXConnectorFactory.connect(serviceURL, null);
conn = connector.getMBeanServerConnection();
} catch (IOException e) {
throw new RuntimeException("连接kafka connect jmx 失败!" + e.getMessage());
}
if (conn == null) {
throw new IllegalArgumentException("获取Kafka JMX 连接为空!");
}
}
/**
* 获取处理总条数
*
* @param connectorName 连接器名称
* @param clazz 连接器类名
* @param taskId 任务id
*/
public Long getRecordTotal(String connectorName, String clazz, Integer taskId) {
Object attribute;
try {
attribute = conn.getAttribute(
KafkaMonitorMetricsConstant.getSourceTaskObjectName(connectorName, clazz, taskId),
KafkaMonitorMetricsConstant.Attribute.RECORD_TOTAL
);
} catch (IOException e) {
throw new BusinessException("获取统计信息失败! 信息如下:" + e.getMessage());
} catch (Exception e) {
return 0L;
}
if (attribute == null) {
return 0L;
}
return resultHandle(attribute.toString());
}
/**
* 获取处理总条数
*
* @param connectorName 连接器名称
* @param taskId 任务id
*/
@SneakyThrows
public Long getRecordFailure(String connectorName, Integer taskId) {
Object attribute = conn.getAttribute(
KafkaMonitorMetricsConstant.getTaskErrorObjectName(connectorName, taskId),
KafkaMonitorMetricsConstant.Attribute.RECORD_ERROR
);
if (attribute == null) {
return 0L;
}
return resultHandle(attribute);
}
private long resultHandle(Object attribute) {
String s = attribute.toString();
if (s.contains(StringPool.DOT)) {
return Long.parseLong(s.substring(0, s.indexOf(StringPool.DOT)));
}
return Long.parseLong(s);
}
/**
* 获取处理总条数
*
* @param connectorName 连接器名称
* @param taskId 任务id
*/
public Long getNetworkByte(String connectorName, String clazz, Integer taskId) {
Object attribute;
try {
attribute = conn.getAttribute(
KafkaMonitorMetricsConstant.getObjectName(connectorName, clazz, taskId),
KafkaMonitorMetricsConstant.Attribute.NETWORK_TOTAL
);
} catch (IOException e) {
throw new BusinessException("获取统计信息失败! 信息如下:" + e.getMessage());
} catch (Exception e) {
return 0L;
}
if (attribute == null) {
return 0L;
}
return resultHandle(attribute.toString());
}
}
- JMX请求枚举
/**
* kafka JMX 监控指标枚举类
* <p>
* more info: http://kafka.apache.org/documentation.html#selector_monitoring
*/
public interface KafkaMonitorMetricsConstant {
/**
* 指标元素
*/
interface Attribute {
// 失败条数
String RECORD_ERROR = "total-record-failures";
// 处理总条数
String RECORD_TOTAL = "source-record-poll-total";
// 总流量
String NETWORK_TOTAL = "network-io-total";
// 出网流量
String NETWORK_OUT = "outgoing-byte-total";
// 入网流量
String NETWORK_IN = "incoming-byte-total";
}
String SOURCE_KEY = "Source";
String SINK_KEY = "Sink";
String sourceVal = "source";
String sinkVal = "sink";
/**
* 任务错误信息 JMX ObjectName
*/
@SneakyThrows
static ObjectName getTaskErrorObjectName(String connectorName, Integer taskId) {
return ObjectName.getInstance("kafka.connect:type=task-error-metrics,connector=" + connectorName + ",task=" + taskId);
}
/**
* source任务 JMX ObjectName
*/
@SneakyThrows
static ObjectName getSourceTaskObjectName(String connectorName, String clazz, Integer taskId) {
return ObjectName.getInstance("kafka.connect:type=" + getTypeByClazz(clazz) + "-task-metrics,connector=" + connectorName + ",task=" + taskId);
}
/**
* 提供者监控 JMX ObjectName
*/
@SneakyThrows
static ObjectName getObjectName(String connectorName, String clazz, Integer taskId) {
return ObjectName.getInstance("kafka.producer:type=" + getInstanceByClazz(clazz) + "-metrics,client-id=connector-producer-" + connectorName + "-" + taskId);
}
/**
* source对应生产者,sink对应消费者,因此其监控所在的instance名不一致。
*/
static String getInstanceByClazz(String connectorClazz){
if(connectorClazz.contains(SOURCE_KEY)) {
return "producer";
} else if (connectorClazz.contains(SINK_KEY)) {
return "consumer";
}
throw new BusinessException();
}
static String getTypeByClazz(String clazz) {
if(clazz.contains(SOURCE_KEY)) {
return "source";
} else if (clazz.contains(SINK_KEY)) {
return "sink";
}
throw new BusinessException();
}
附录
添加额外连接器
kafka官方包里面只支持几种简单的connector类型,像JDBC这些是不支持的,因此需要额外添加
地址:https://www.confluent.io/hub
connect URI介绍
请求方法 | 路径 | 含义 |
---|---|---|
GET | /connectors | 返回所有正在运行的connector名。 |
GET | /connectors/{name} | 获取指定connetor的信息。 |
GET | /connectors/{name}/config | 获取指定connector的配置信息 |
GET | /connectors/{name}/status | 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。 |
GET | /connectors/{name}/tasks | 获取指定connector正在运行的task。 |
GET | /connectors/{name}/tasks/{taskid}/status | 获取指定connector的task的状态信息。 |
PUT | /connectors/{name}/config | 更新指定connector的配置信息 |
PUT | /connectors/{name}/pause | 暂停connector和它的task,停止数据处理知道它被恢复 |
PUT | /connectors/{name}/resume | 恢复一个被暂停的connector。 |
POST | /connectors | 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。 |
POST | /connectors/{name}/restart | 重启一个connector,尤其是在一个connector运行失败的情况下比较常用 |
POST | /connectors/{name}/tasks/{taskId}/restart | 重启一个task,一般是因为它运行失败才这样做。 |
DELETE | /connectors/{name} | 删除一个connector,停止它的所有task并删除配置。 |
更多推荐
已为社区贡献1条内容
所有评论(0)