版权说明: 本文由博主keep丶原创,转载请注明出处。
原文地址: https://blog.csdn.net/qq_38688267/article/details/120729471

前言

实践

环境搭建

  作者是在linux环境中搭建的,建议在linux环境测试。

kafka安装运行

# 解压
tar -xzf kafka_2.13-3.0.0.tgz
# 进入主目录
cd kafka_2.13-3.0.0
# 启动zk
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动kafka
bin/kafka-server-start.sh config/server.properties

启动kafka connect

# 第一个properties是启动connect的配置;
# 第二个properties是connector的配置,会根据配置创建一个connector
bin/connect-standalone.sh \
	config/connect-standalone.properties \
	connect-file-sink.properties

功能实现

实现mysql数据读取和插入

  kafka官方包里面只支持几种简单的connector类型,像JDBC这些是不支持的,因此需要额外添加。

  • 直接linux下载
# 需要安装wget工具,如果没有
yum install -y wget

# 下载
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.2.4/confluentinc-kafka-connect-jdbc-10.2.4.zip
  • 安装
# 先解压
tar -xzf confluentinc-kafka-connect-jdbc-10.2.3.zip
# 将该工具里的lib下面的包,复制到kafka/lib下即可
cp confluentinc-kafka-connect-jdbc-10.2.3/lib/* KAFKA_HOME/lib
# 再重启kafka和connect
  • windows下载JDBC连接包,再上传到Linux
    下载地址

  • 添加一个MySQL source connector; source相当于消息生产者,从指定对象中(这里的MySQL的test_zzf表中的数据)读取到指定主题中。

curl -X POST -H "Content-Type: application/json" \
-d '{
	"name":"test_mysql_source", 
	"config":{
		"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
		"mode":"bulk",
		"tables":"`zzf`.`test_zzf`",
		"task.class":"io.confluent.connect.jdbc.source.JdbcSourceTask",
		"tasks.max":"2",
		"topics":"mysql_test",
		"name":"test_mysql",
		"connection.url":"jdbc:mysql://localhost:3306/zzf?user=root&password=123456",
		"table.whitelist":"test_zzf"
	}
}'
  • 添加一个MySQL sink connector,sink相当于消费者,即会将指定主题中的数据消费到指定对象中(这里是mysql的test_zzf1表中)
curl -X POST -H "Content-Type: application/json" \
-d '{
	"name":"test_mysql_sink", 
	"config":{
		"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
		"mode":"bulk",
		"tables":"`zzf`.`test_zzf1`",
		"task.class":"io.confluent.connect.jdbc.source.JdbcSinkTask",
		"tasks.max":"2",
		"topics":"mysql_test",
		"name":"test_mysql",
		"connection.url":"jdbc:mysql://localhost:3306/zzf?user=root&password=123456",
		"table.whitelist":"test_zzf"
	}
}'

开启JMX监控

  • 请参考Kafka开启JMX监控

  • 需要注意的是,修改kafka_run_class.sh,在第一行增加JMX_PORT=9527。在完成kafka connect启动后,要将该行注释!否则kafka运行其他功能时会报错!

  • 关于JMX中的指标,在官网中有介绍,传送门

spring boot 项目集成

集成Kafka Connector

  • 添加官方依赖
        <dependency>
            <groupId>org.sourcelab</groupId>
            <artifactId>kafka-connect-client</artifactId>
            <version>3.1.1</version>
        </dependency>
  • 配置类

@Configuration
@ConditionalOnProperty(value = "kafka.connect.enabled", havingValue = "true")
public class KafkaConnectorClientConfig {

    @Value("${kafka.connect.host}")
    private String connectHost;

    /**
     * 初始化kafka connector连接客户端
     */
    @Bean
    public ConnectorClient connectorClient(){
        org.sourcelab.kafka.connect.apiclient.Configuration configuration = new org.sourcelab.kafka.connect.apiclient.Configuration(connectHost);
        HttpClientRestClient httpClientRestClient = new HttpClientRestClient();
        httpClientRestClient.init(configuration);
        return new ConnectorClient(httpClientRestClient);
    }

  • 客户端封装类
@AllArgsConstructor
public class ConnectorClient {

    @Getter
    private final HttpClientRestClient client;


	/*
		kafka connect 官方依赖中并没有很好的封装出返回值对象,而是通过统一的ResonseStr字符串返回。
		因此为了方便业务使用,我们可以自己创建对应的返回值对象,封装用到的方法。
	*/
    /**
     * 查询connector状态
     * @param connectorName 连接器名
     */
    public ConnectorStatusRep connectorStatus(String connectorName){
        RestResponse response = client.submitRequest(new GetConnectorStatus(connectorName));
        if(fail(response)) {
            throw new RuntimeException("请求kafka connector失败!返回信息:" + response.getResponseStr());
        }
        return JSON.parseObject(response.getResponseStr(), ConnectorStatusRep.class);
    }

    private boolean success(RestResponse response) {
        return HttpStatus.OK.value() == response.getHttpCode();
    }

    private boolean fail(RestResponse response) {
        return !success(response);
    }
}

集成kafka Monitor

  • 配置类
@Configuration
@ConditionalOnProperty(value = "kafka.jmx.enabled", havingValue = "true")
public class KafkaMonitorConfig {

    @Value("${kafka.jmx.host}")
    private String jmxHost;


    @Bean
    KafkaMonitorClient kafkaMonitorClient(){
        return new KafkaMonitorClient(jmxHost);
    }
}
  • 客户端封装类

public class KafkaMonitorClient {

    private MBeanServerConnection conn;

    public KafkaMonitorClient(String jmxHost) {
        String jmxURL = "service:jmx:rmi:///jndi/rmi://" + jmxHost + "/jmxrmi";
        JMXServiceURL serviceURL;
        try {
            serviceURL = new JMXServiceURL(jmxURL);
            JMXConnector connector = JMXConnectorFactory.connect(serviceURL, null);
            conn = connector.getMBeanServerConnection();
        } catch (IOException e) {
            throw new RuntimeException("连接kafka connect jmx 失败!" + e.getMessage());
        }
        if (conn == null) {
            throw new IllegalArgumentException("获取Kafka JMX 连接为空!");
        }
    }

    /**
     * 获取处理总条数
     *
     * @param connectorName 连接器名称
     * @param clazz         连接器类名
     * @param taskId        任务id
     */
    public Long getRecordTotal(String connectorName, String clazz, Integer taskId) {
        Object attribute;
        try {
            attribute = conn.getAttribute(
                    KafkaMonitorMetricsConstant.getSourceTaskObjectName(connectorName, clazz, taskId),
                    KafkaMonitorMetricsConstant.Attribute.RECORD_TOTAL
            );
        } catch (IOException e) {
            throw new BusinessException("获取统计信息失败! 信息如下:" + e.getMessage());
        } catch (Exception e) {
            return 0L;
        }
        if (attribute == null) {
            return 0L;
        }
        return resultHandle(attribute.toString());
    }

    /**
     * 获取处理总条数
     *
     * @param connectorName 连接器名称
     * @param taskId        任务id
     */
    @SneakyThrows
    public Long getRecordFailure(String connectorName, Integer taskId) {
        Object attribute = conn.getAttribute(
                KafkaMonitorMetricsConstant.getTaskErrorObjectName(connectorName, taskId),
                KafkaMonitorMetricsConstant.Attribute.RECORD_ERROR
        );
        if (attribute == null) {
            return 0L;
        }
        return resultHandle(attribute);
    }

    private long resultHandle(Object attribute) {
        String s = attribute.toString();
        if (s.contains(StringPool.DOT)) {
            return Long.parseLong(s.substring(0, s.indexOf(StringPool.DOT)));
        }
        return Long.parseLong(s);
    }

    /**
     * 获取处理总条数
     *
     * @param connectorName 连接器名称
     * @param taskId        任务id
     */
    public Long getNetworkByte(String connectorName, String clazz, Integer taskId) {
        Object attribute;
        try {
            attribute = conn.getAttribute(
                    KafkaMonitorMetricsConstant.getObjectName(connectorName, clazz, taskId),
                    KafkaMonitorMetricsConstant.Attribute.NETWORK_TOTAL
            );
        } catch (IOException e) {
            throw new BusinessException("获取统计信息失败! 信息如下:" + e.getMessage());
        } catch (Exception e) {
            return 0L;
        }
        if (attribute == null) {
            return 0L;
        }
        return resultHandle(attribute.toString());
    }
}
  • JMX请求枚举
/**
 * kafka JMX 监控指标枚举类
 * <p>
 * more info: http://kafka.apache.org/documentation.html#selector_monitoring
 */
public interface KafkaMonitorMetricsConstant {

    /**
     * 指标元素
     */
    interface Attribute {
        // 失败条数
        String RECORD_ERROR = "total-record-failures";
        // 处理总条数
        String RECORD_TOTAL = "source-record-poll-total";
        // 总流量
        String NETWORK_TOTAL = "network-io-total";
        // 出网流量
        String NETWORK_OUT = "outgoing-byte-total";
        // 入网流量
        String NETWORK_IN = "incoming-byte-total";
    }

    String SOURCE_KEY = "Source";

    String SINK_KEY = "Sink";

    String sourceVal = "source";

    String sinkVal = "sink";


    /**
     * 任务错误信息 JMX ObjectName
     */
    @SneakyThrows
    static ObjectName getTaskErrorObjectName(String connectorName, Integer taskId) {
        return ObjectName.getInstance("kafka.connect:type=task-error-metrics,connector=" + connectorName + ",task=" + taskId);
    }

    /**
     * source任务 JMX ObjectName
     */
    @SneakyThrows
    static ObjectName getSourceTaskObjectName(String connectorName, String clazz, Integer taskId) {
        return ObjectName.getInstance("kafka.connect:type=" + getTypeByClazz(clazz) + "-task-metrics,connector=" + connectorName + ",task=" + taskId);
    }

    /**
     * 提供者监控 JMX ObjectName
     */
    @SneakyThrows
    static ObjectName getObjectName(String connectorName, String clazz, Integer taskId) {
        return ObjectName.getInstance("kafka.producer:type=" + getInstanceByClazz(clazz) + "-metrics,client-id=connector-producer-" + connectorName + "-" + taskId);
    }


    /**
     * source对应生产者,sink对应消费者,因此其监控所在的instance名不一致。
     */
    static String getInstanceByClazz(String connectorClazz){
        if(connectorClazz.contains(SOURCE_KEY)) {
            return "producer";
        } else if (connectorClazz.contains(SINK_KEY)) {
            return "consumer";
        }
        throw new BusinessException();
    }

    static String getTypeByClazz(String clazz) {
        if(clazz.contains(SOURCE_KEY)) {
            return "source";
        } else if (clazz.contains(SINK_KEY)) {
            return "sink";
        }
        throw new BusinessException();
    }

附录

添加额外连接器

  kafka官方包里面只支持几种简单的connector类型,像JDBC这些是不支持的,因此需要额外添加
地址:https://www.confluent.io/hub

connect URI介绍

请求方法路径含义
GET/connectors返回所有正在运行的connector名。
GET/connectors/{name}获取指定connetor的信息。
GET/connectors/{name}/config获取指定connector的配置信息
GET/connectors/{name}/status获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET/connectors/{name}/tasks获取指定connector正在运行的task。
GET/connectors/{name}/tasks/{taskid}/status获取指定connector的task的状态信息。
PUT/connectors/{name}/config更新指定connector的配置信息
PUT/connectors/{name}/pause暂停connector和它的task,停止数据处理知道它被恢复
PUT/connectors/{name}/resume恢复一个被暂停的connector。
POST/connectors新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
POST/connectors/{name}/restart重启一个connector,尤其是在一个connector运行失败的情况下比较常用
POST/connectors/{name}/tasks/{taskId}/restart重启一个task,一般是因为它运行失败才这样做。
DELETE/connectors/{name}删除一个connector,停止它的所有task并删除配置。
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐