在前一章节中,Billow介绍了如何通过1.1.1以上的canal配置将binlog数据投递到kafka。在实际的生产环境中,我们的kafka很多都会集成Kerberos作为安全认证。那么在本节,Billow将介绍如何通过修改源码使Canal可配置为投递数据到Kerberos认证的Kafka集群。

##1.导入Canal源码
canal已开源到github。下载地址为:https://github.com/alibaba/canal.git
####1.1 在idea中导入git项目。

导入后的项目目录为:

####1.2 修改canal启动类

canal独立版本的入口类为:com.alibaba.otter.canal.deployer.CanalLauncher

在该类的main方法中,做了以下几件事情:
1、加载配置。
2、根据配置启动CanalStater

...
...
logger.info("## load canal configurations");
            String conf = System.getProperty("canal.conf", "classpath:canal.properties");
            Properties properties = new Properties();
            RemoteConfigLoader remoteConfigLoader = null;
            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
                properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
            } else {
                properties.load(new FileInputStream(conf));
            }
...
...
final CanalStater canalStater = new CanalStater();
            canalStater.start(properties);

在CanalStater.start方法中,通过配置项初始化MQ的生产者。此处Billow配置为Kafka,因此我们只关注kafka。

在初始化CanalKafkaProducer之后,会读取配置文件中的mq配置。

在canal.properties中的mq配置如下:

##################################################
######### 		     MQ 		     #############
##################################################
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = false


canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "/usr/keytab/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "/usr/keytab/jaas.conf"

其中canal.mq.kafka.kerberos为前缀的配置是Billow的自定义kerberos配置项。说明:

  • canal.mq.kafka.kerberos.enable
    此配置项为true跟false,为true时表示kafka集群开启了kerberos认证,那么会读取接下来的两个配置项内容。

  • canal.mq.kafka.kerberos.krb5FilePath
    此配置项当canal.mq.kafka.kerberos.enable为true时才会读取,配置为kerberos集群中的krb5.conf文件。示例:

[logging]
 default = FILE:/var/log/krb5libs.log
 kdc = FILE:/var/log/krb5kdc.log
 admin_server = FILE:/var/log/kadmind.log

[libdefaults]
 default_realm = HADOOP.COM
 dns_lookup_realm = false
 dns_lookup_kdc = false
 ticket_lifetime = 24h
 renew_lifetime = 7d
 forwardable = true

[realms]
 BETA.COM = {
  kdc = hadoop1.com
  admin_server = hadoop1.com
 }

[domain_realm]
 .hadoop1.com = HADOOP.COM
 hadoop1.com = HADOOP.COM

  • canal.mq.kafka.kerberos.jaasFilePath
    此配置项当canal.mq.kafka.kerberos.enable为true时才会读取,配置为连接kafka时的jaas配置项。示例:
KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="E:/resources/billow.keytab"
   principal="billow@HADOOP.COM"
   client=true;
};

此处Billow在配置文件中配置了自定义的配置项,那么在代码中,需要添加这几项配置项的读取。
CanalStater的buildMQProperties方法中添加配置项的读取。

/**
     * 构造MQ对应的配置
     * 
     * @param properties canal.properties 配置
     * @return
     */
    private static MQProperties buildMQProperties(Properties properties) {
        MQProperties mqProperties = new MQProperties();
        ......
        ......
        String kafkaKerberosEnable = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE);
        if (!StringUtils.isEmpty(kafkaKerberosEnable)) {
            mqProperties.setKerberosEnable(Boolean.valueOf(kafkaKerberosEnable));
        }
        String kafkaKerberosKrb5Filepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH);
        if (!StringUtils.isEmpty(kafkaKerberosKrb5Filepath)) {
            mqProperties.setKerberosKrb5FilePath(kafkaKerberosKrb5Filepath);
        }
        String kafkaKerberosJaasFilepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH);
        if (!StringUtils.isEmpty(kafkaKerberosJaasFilepath)) {
            mqProperties.setKerberosJaasFilePath(kafkaKerberosJaasFilepath);
        }
        return mqProperties;
    }

对应的CanalConstants类中,添加常量信息配置:

/**
 * 启动常用变量
 *
 * @author jianghang 2012-11-8 下午03:15:55
 * @version 1.0.0
 */
public class CanalConstants {
...
...
    public static final String CANAL_MQ_KAFKA_KERBEROS_ENABLE    = ROOT + "." + "mq.kafka.kerberos.enable";
    public static final String CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH  = ROOT + "." + "mq.kafka.kerberos.krb5FilePath";
    public static final String CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH  = ROOT + "." + "mq.kafka.kerberos.jaasFilePath";
...
...

####1.3 配置CanalKafkaProducer
上一小节中,Billow介绍了如何添加关于Kerberos的开关配置。在这节我们来看看如何配置kafkaProducer为安全模式。

观察源码发现,在CanalStater的start方法中初始化了一个CanalKafkaProducer对象。在此对象的init方法里面,有关于kafkaproduct的相关配置。
在此处,Billow添加了判断,如果配置文件中开启了kerberos认证,那么就会配置kafkaProperty为安全模式。并添加了系统环境配置。

 if (kafkaProperties.isKerberosEnable()){
            //kafka集群开启了kerberos认证
            System.setProperty("java.security.krb5.conf", kafkaProperties.getKerberosKrb5FilePath());
            System.setProperty("java.security.auth.login.config", kafkaProperties.getKerberosJaasFilePath());
            System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
            properties.put("security.protocol", "SASL_PLAINTEXT");
            properties.put("sasl.kerberos.service.name", "kafka");

        }

具体位置为:

public class CanalKafkaProducer implements CanalMQProducer {

...
...


    @Override
    public void init(MQProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaProperties.getServers());
        properties.put("acks", kafkaProperties.getAcks());
        properties.put("compression.type", kafkaProperties.getCompressionType());
        properties.put("batch.size", kafkaProperties.getBatchSize());
        properties.put("linger.ms", kafkaProperties.getLingerMs());
        properties.put("max.request.size", kafkaProperties.getMaxRequestSize());
        properties.put("buffer.memory", kafkaProperties.getBufferMemory());
        properties.put("key.serializer", StringSerializer.class.getName());
        if(kafkaProperties.getTransaction()){
            properties.put("transactional.id", "canal-transactional-id");
        } else {
            properties.put("retries", kafkaProperties.getRetries());
        }
        if (kafkaProperties.isKerberosEnable()){
            //kafka集群开启了kerberos认证
            System.setProperty("java.security.krb5.conf", kafkaProperties.getKerberosKrb5FilePath());
            System.setProperty("java.security.auth.login.config", kafkaProperties.getKerberosJaasFilePath());
            System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
            properties.put("security.protocol", "SASL_PLAINTEXT");
            properties.put("sasl.kerberos.service.name", "kafka");

        }
        if (!kafkaProperties.getFlatMessage()) {
            properties.put("value.serializer", MessageSerializer.class.getName());
            producer = new KafkaProducer<String, Message>(properties);
        } else {
            properties.put("value.serializer", StringSerializer.class.getName());
            producer2 = new KafkaProducer<String, String>(properties);
        }
        if (kafkaProperties.getTransaction()) {
            if (!kafkaProperties.getFlatMessage()) {
                producer.initTransactions();
            } else {
                producer2.initTransactions();
            }
        }
    }

...
...
}

##2、测试
修改好源码后,编译打包。

mvn clean install -Dmaven.test.skip -Denv=release

命令执行成功后会在项目的target文件夹下面生成压缩包:

将deployer包拷贝至服务器,配置好集群环境的krb5.conf、jaas.conf以及canal.properties文件。启动canal,查看日志,并启动kafka消费者进行数据的消费。
Billow已测试成功,有不懂的童鞋可以私信公众号问~

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐