Zookeeper 实现分布式配置中心
个人备忘前言:zookeeper 五个功能点1. master的管理,如amq 集群,kafka集群。2. 分布式锁(悲观、乐观)3. 分布式配置中心。4. 集群的监管。5. 发布与订阅(队列)。以上五点,都是zookeeper的特性决定的,我们知道zookeeper有两类节点:1. 临时节点。(可顺序)2. 永久节点。(可顺序)再加上zookeeper提供了,对节点
个人备忘
前言:
zookeeper 五个功能点
1. master的管理,如amq 集群,kafka集群。
2. 分布式锁(悲观、乐观)
3. 分布式配置中心。
4. 集群的监管。
5. 发布与订阅(队列)。
以上五点,都是zookeeper的特性决定的,我们知道zookeeper有两类节点:
1. 临时节点。(可顺序)
2. 永久节点。(可顺序)
再加上zookeeper提供了,对节点的监听事件(删除,新增,修改–针对节点),使得我们可以通过节点的监听来做相对应的业务逻辑,以上五点均是这样。
其他:zookeeper的leader 的选举是通过 ZAB 进行的,保证有序性是靠:zxid 。其他核心算法,自行科补 。
简单说下原理,已数据库连接池jdbc 为例,我们通常将配置信息配置到我们的property文件中,用key=value 来进行配置,动态加载到我们的xml 中(原理:xml 读流到内存,property读流到内存,${}表达式作为String 的切割,来动态添加)。我们可以通过zookeeper对节点的监听,来获取配置在远程zookeeper服务节点中的“数据库连接等信息”如:uname ,pass ,url ,driver 。获取到之后,我们通过Spring 的上下文对象,在java中动态获取 datasource 这个bean , 通过datasource 对象的set 方法,将zookeeper节点获取到的连接信息设置到datasource 对象中。我们还可以通过对这些节点的监听(watcher),当配置信息变化后,可以通过 所有的client 对节点的监听来完成对datasource 配置新的连接信息。
代码展示 :
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:annotation-config />
<context:component-scan base-package="com.dongnao.demo" />
<bean id="zkCentralConfigurer" class="com.dongnao.demo.config.ZookeeperCentralConfigurer">
<constructor-arg name="zkServers" value="localhost:2181" />
<constructor-arg name="sessionTimeout" value="1000" />
<constructor-arg name="zkPath" value="/jdbc" />
</bean>
<bean id="zkPlaceholderConfigurer" class="com.dongnao.demo.config.ZookeeperPlaceholderConfigurer">
<property name="zkCentralConfigurer" ref="zkCentralConfigurer" />
<property name="ignoreUnresolvablePlaceholders" value="true" />
<property name="order" value="1" />
</bean>
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource">
<ref bean="dataSource" />
</property>
</bean>
<bean id="dataSource" class="com.zaxxer.hikari.HikariDataSource"
destroy-method="shutdown">
<property name="driverClassName" value="${driver}" />
<property name="jdbcUrl" value="${url}" />
<property name="username" value="${uname}" />
<property name="password" value="${upass}" />
<!-- 连接只读数据库时配置为true, 保证安全 -->
<property name="readOnly" value="false" />
<!-- 等待连接池分配连接的最大时长(毫秒),超过这个时长还没可用的连接则发生SQLException, 缺省:30秒 -->
<property name="connectionTimeout" value="30000" />
<!-- 一个连接idle状态的最大时长(毫秒),超时则被释放(retired),缺省:10分钟 -->
<property name="idleTimeout" value="600000" />
<!-- 一个连接的生命时长(毫秒),超时而且没被使用则被释放(retired),缺省:30分钟,建议设置比数据库超时时长少30秒,参考MySQL
wait_timeout参数(show variables like '%timeout%';) -->
<property name="maxLifetime" value="1800000" />
<!-- 连接池中允许的最大连接数。缺省值:10;推荐的公式:((core_count * 2) + effective_spindle_count) -->
<property name="maximumPoolSize" value="15" />
</bean>
</beans>
ZookeeperCentralConfigurer .java
package com.dongnao.demo.config;
import java.util.List;
import java.util.Properties;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.web.context.ContextLoader;
import org.springframework.web.context.WebApplicationContext;
import com.zaxxer.hikari.HikariDataSource;
public class ZookeeperCentralConfigurer {
private CuratorFramework zkClient;
private TreeCache treeCache;
private String zkServers;
private String zkPath;
private int sessionTimeout;
private Properties props;
public ZookeeperCentralConfigurer(String zkServers, String zkPath, int sessionTimeout) {
this.zkServers = zkServers;
this.zkPath = zkPath;
this.sessionTimeout = sessionTimeout;
this.props = new Properties();
initZkClient();
getConfigData();
addZkListener();
}
private void initZkClient() {
zkClient = CuratorFrameworkFactory.builder().connectString(zkServers).sessionTimeoutMs(sessionTimeout)
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
zkClient.start();
}
private void getConfigData() {
try {
List<String> list = zkClient.getChildren().forPath(zkPath);
for (String key : list) {
String value = new String(zkClient.getData().forPath(zkPath + "/" + key));
if (value != null && value.length() > 0) {
props.put(key, value);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void addZkListener() {
TreeCacheListener listener = new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
if (event.getType() == TreeCacheEvent.Type.NODE_UPDATED) {
getConfigData();
WebApplicationContext ctx = ContextLoader.getCurrentWebApplicationContext();
HikariDataSource dataSource = (HikariDataSource) ctx.getBean("dataSource");
System.out.println("================"+props.getProperty("url"));
dataSource.setJdbcUrl(props.getProperty("url"));
}
}
};
treeCache = new TreeCache(zkClient, zkPath);
try {
treeCache.start();
treeCache.getListenable().addListener(listener);
} catch (Exception e) {
e.printStackTrace();
}
}
public Properties getProps() {
return props;
}
public void setZkServers(String zkServers) {
this.zkServers = zkServers;
}
public void setZkPath(String zkPath) {
this.zkPath = zkPath;
}
public void setSessionTimeout(int sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
}
ZookeeperPlaceholderConfigurer.java
package com.dongnao.demo.config;
import java.util.Properties;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer;
public class ZookeeperPlaceholderConfigurer extends PropertyPlaceholderConfigurer {
private ZookeeperCentralConfigurer zkCentralConfigurer;
@Override
protected void processProperties(ConfigurableListableBeanFactory beanFactoryToProcess, Properties props)
throws BeansException {
super.processProperties(beanFactoryToProcess, zkCentralConfigurer.getProps());
}
public void setZkCentralConfigurer(ZookeeperCentralConfigurer zkCentralConfigurer) {
this.zkCentralConfigurer = zkCentralConfigurer;
}
}
Zk客户端命令 :
[zk: localhost:2181(CONNECTED) 2]create /jdbc db
Created /jdbc
[zk: localhost:2181(CONNECTED) 2] create /jdbc/url jdbc:mysql://localhost:3306/miaoshatest?useUnicode=true&characterEncoding=UTF-8
Created /jdbc/url
[zk: localhost:2181(CONNECTED) 3] create /jdbc/url jdbc.mysql://localhost:3306/miaoshatest?useUnicode=true&characterEncoding=UTF-8
Node already exists: /jdbc/url
[zk: localhost:2181(CONNECTED) 4] get /jdbc/url
jdbc:mysql://localhost:3306/miaoshatest?useUnicode=true&characterEncoding=UTF-8
cZxid = 0x3
ctime = Tue Mar 13 15:46:46 CST 2018
mZxid = 0x3
mtime = Tue Mar 13 15:46:46 CST 2018
pZxid = 0x3
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 79
numChildren = 0
[zk: localhost:2181(CONNECTED) 5] set /jdbc/url jdbc.mysql://localhost:3306/miaoshatest?useUnicode=true&characterEncoding=UTF-8
[zk: localhost:2181(CONNECTED) 11] create /jdbc/uname root
Created /jdbc/uname
[zk: localhost:2181(CONNECTED) 12] create /jdbc/driver com.mysql.jdbc.Driver
Created /jdbc/driver
[zk: localhost:2181(CONNECTED) 13] create /jdbc/upass agui
Created /jdbc/upass
启动tomcat即可。
更多推荐
所有评论(0)