阿里开源Canal--⑤投递到Kerberos认证的Kafka
在前一章节中,Billow介绍了如何通过1.1.1以上的canal配置将binlog数据投递到kafka。在实际的生产环境中,我们的kafka很多都会集成Kerberos作为安全认证。那么在本节,Billow将介绍如何通过修改源码使Canal可配置为投递数据到Kerberos认证的Kafka集群。##1.导入Canal源码canal已开源到github。下载地址为:https://github..
在前一章节中,Billow介绍了如何通过1.1.1以上的canal配置将binlog数据投递到kafka。在实际的生产环境中,我们的kafka很多都会集成Kerberos作为安全认证。那么在本节,Billow将介绍如何通过修改源码使Canal可配置为投递数据到Kerberos认证的Kafka集群。
##1.导入Canal源码
canal已开源到github。下载地址为:https://github.com/alibaba/canal.git
####1.1 在idea中导入git项目。
导入后的项目目录为:
####1.2 修改canal启动类
canal独立版本的入口类为:com.alibaba.otter.canal.deployer.CanalLauncher
在该类的main方法中,做了以下几件事情:
1、加载配置。
2、根据配置启动CanalStater
...
...
logger.info("## load canal configurations");
String conf = System.getProperty("canal.conf", "classpath:canal.properties");
Properties properties = new Properties();
RemoteConfigLoader remoteConfigLoader = null;
if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
} else {
properties.load(new FileInputStream(conf));
}
...
...
final CanalStater canalStater = new CanalStater();
canalStater.start(properties);
在CanalStater.start方法中,通过配置项初始化MQ的生产者。此处Billow配置为Kafka,因此我们只关注kafka。
在初始化CanalKafkaProducer之后,会读取配置文件中的mq配置。
在canal.properties中的mq配置如下:
##################################################
######### MQ #############
##################################################
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = false
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "/usr/keytab/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "/usr/keytab/jaas.conf"
其中canal.mq.kafka.kerberos为前缀的配置是Billow的自定义kerberos配置项。说明:
-
canal.mq.kafka.kerberos.enable
此配置项为true跟false,为true时表示kafka集群开启了kerberos认证,那么会读取接下来的两个配置项内容。 -
canal.mq.kafka.kerberos.krb5FilePath
此配置项当canal.mq.kafka.kerberos.enable为true时才会读取,配置为kerberos集群中的krb5.conf文件。示例:
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
default_realm = HADOOP.COM
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
[realms]
BETA.COM = {
kdc = hadoop1.com
admin_server = hadoop1.com
}
[domain_realm]
.hadoop1.com = HADOOP.COM
hadoop1.com = HADOOP.COM
- canal.mq.kafka.kerberos.jaasFilePath
此配置项当canal.mq.kafka.kerberos.enable为true时才会读取,配置为连接kafka时的jaas配置项。示例:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="E:/resources/billow.keytab"
principal="billow@HADOOP.COM"
client=true;
};
此处Billow在配置文件中配置了自定义的配置项,那么在代码中,需要添加这几项配置项的读取。
CanalStater的buildMQProperties方法中添加配置项的读取。
/**
* 构造MQ对应的配置
*
* @param properties canal.properties 配置
* @return
*/
private static MQProperties buildMQProperties(Properties properties) {
MQProperties mqProperties = new MQProperties();
......
......
String kafkaKerberosEnable = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE);
if (!StringUtils.isEmpty(kafkaKerberosEnable)) {
mqProperties.setKerberosEnable(Boolean.valueOf(kafkaKerberosEnable));
}
String kafkaKerberosKrb5Filepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH);
if (!StringUtils.isEmpty(kafkaKerberosKrb5Filepath)) {
mqProperties.setKerberosKrb5FilePath(kafkaKerberosKrb5Filepath);
}
String kafkaKerberosJaasFilepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH);
if (!StringUtils.isEmpty(kafkaKerberosJaasFilepath)) {
mqProperties.setKerberosJaasFilePath(kafkaKerberosJaasFilepath);
}
return mqProperties;
}
对应的CanalConstants类中,添加常量信息配置:
/**
* 启动常用变量
*
* @author jianghang 2012-11-8 下午03:15:55
* @version 1.0.0
*/
public class CanalConstants {
...
...
public static final String CANAL_MQ_KAFKA_KERBEROS_ENABLE = ROOT + "." + "mq.kafka.kerberos.enable";
public static final String CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH = ROOT + "." + "mq.kafka.kerberos.krb5FilePath";
public static final String CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH = ROOT + "." + "mq.kafka.kerberos.jaasFilePath";
...
...
####1.3 配置CanalKafkaProducer
上一小节中,Billow介绍了如何添加关于Kerberos的开关配置。在这节我们来看看如何配置kafkaProducer为安全模式。
观察源码发现,在CanalStater的start方法中初始化了一个CanalKafkaProducer对象。在此对象的init方法里面,有关于kafkaproduct的相关配置。
在此处,Billow添加了判断,如果配置文件中开启了kerberos认证,那么就会配置kafkaProperty为安全模式。并添加了系统环境配置。
if (kafkaProperties.isKerberosEnable()){
//kafka集群开启了kerberos认证
System.setProperty("java.security.krb5.conf", kafkaProperties.getKerberosKrb5FilePath());
System.setProperty("java.security.auth.login.config", kafkaProperties.getKerberosJaasFilePath());
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.kerberos.service.name", "kafka");
}
具体位置为:
public class CanalKafkaProducer implements CanalMQProducer {
...
...
@Override
public void init(MQProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
Properties properties = new Properties();
properties.put("bootstrap.servers", kafkaProperties.getServers());
properties.put("acks", kafkaProperties.getAcks());
properties.put("compression.type", kafkaProperties.getCompressionType());
properties.put("batch.size", kafkaProperties.getBatchSize());
properties.put("linger.ms", kafkaProperties.getLingerMs());
properties.put("max.request.size", kafkaProperties.getMaxRequestSize());
properties.put("buffer.memory", kafkaProperties.getBufferMemory());
properties.put("key.serializer", StringSerializer.class.getName());
if(kafkaProperties.getTransaction()){
properties.put("transactional.id", "canal-transactional-id");
} else {
properties.put("retries", kafkaProperties.getRetries());
}
if (kafkaProperties.isKerberosEnable()){
//kafka集群开启了kerberos认证
System.setProperty("java.security.krb5.conf", kafkaProperties.getKerberosKrb5FilePath());
System.setProperty("java.security.auth.login.config", kafkaProperties.getKerberosJaasFilePath());
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.kerberos.service.name", "kafka");
}
if (!kafkaProperties.getFlatMessage()) {
properties.put("value.serializer", MessageSerializer.class.getName());
producer = new KafkaProducer<String, Message>(properties);
} else {
properties.put("value.serializer", StringSerializer.class.getName());
producer2 = new KafkaProducer<String, String>(properties);
}
if (kafkaProperties.getTransaction()) {
if (!kafkaProperties.getFlatMessage()) {
producer.initTransactions();
} else {
producer2.initTransactions();
}
}
}
...
...
}
##2、测试
修改好源码后,编译打包。
mvn clean install -Dmaven.test.skip -Denv=release
命令执行成功后会在项目的target文件夹下面生成压缩包:
将deployer包拷贝至服务器,配置好集群环境的krb5.conf、jaas.conf以及canal.properties文件。启动canal,查看日志,并启动kafka消费者进行数据的消费。
Billow已测试成功,有不懂的童鞋可以私信公众号问~
更多推荐
所有评论(0)