个人备忘

前言:
zookeeper 五个功能点
1. master的管理,如amq 集群,kafka集群。
2. 分布式锁(悲观、乐观)
3. 分布式配置中心。
4. 集群的监管。
5. 发布与订阅(队列)。

以上五点,都是zookeeper的特性决定的,我们知道zookeeper有两类节点:
1. 临时节点。(可顺序)
2. 永久节点。(可顺序)

再加上zookeeper提供了,对节点的监听事件(删除,新增,修改–针对节点),使得我们可以通过节点的监听来做相对应的业务逻辑,以上五点均是这样。

其他:zookeeper的leader 的选举是通过 ZAB 进行的,保证有序性是靠:zxid 。其他核心算法,自行科补 。

简单说下原理,已数据库连接池jdbc 为例,我们通常将配置信息配置到我们的property文件中,用key=value 来进行配置,动态加载到我们的xml 中(原理:xml 读流到内存,property读流到内存,${}表达式作为String 的切割,来动态添加)。我们可以通过zookeeper对节点的监听,来获取配置在远程zookeeper服务节点中的“数据库连接等信息”如:uname ,pass ,url ,driver 。获取到之后,我们通过Spring 的上下文对象,在java中动态获取 datasource 这个bean , 通过datasource 对象的set 方法,将zookeeper节点获取到的连接信息设置到datasource 对象中。我们还可以通过对这些节点的监听(watcher),当配置信息变化后,可以通过 所有的client 对节点的监听来完成对datasource 配置新的连接信息。

代码展示 :

applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd  
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <context:annotation-config />

    <context:component-scan base-package="com.dongnao.demo" />

    <bean id="zkCentralConfigurer" class="com.dongnao.demo.config.ZookeeperCentralConfigurer">
        <constructor-arg name="zkServers" value="localhost:2181" />
        <constructor-arg name="sessionTimeout" value="1000" />
        <constructor-arg name="zkPath" value="/jdbc" />
    </bean>

    <bean id="zkPlaceholderConfigurer" class="com.dongnao.demo.config.ZookeeperPlaceholderConfigurer">
        <property name="zkCentralConfigurer" ref="zkCentralConfigurer" />
        <property name="ignoreUnresolvablePlaceholders" value="true" />
        <property name="order" value="1" />
    </bean>

    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource">
            <ref bean="dataSource" />
        </property>
    </bean>

    <bean id="dataSource" class="com.zaxxer.hikari.HikariDataSource"
        destroy-method="shutdown">
        <property name="driverClassName" value="${driver}" />
        <property name="jdbcUrl" value="${url}" />
        <property name="username" value="${uname}" />
        <property name="password" value="${upass}" />

        <!-- 连接只读数据库时配置为true, 保证安全 -->
        <property name="readOnly" value="false" />
        <!-- 等待连接池分配连接的最大时长(毫秒),超过这个时长还没可用的连接则发生SQLException, 缺省:30秒 -->
        <property name="connectionTimeout" value="30000" />
        <!-- 一个连接idle状态的最大时长(毫秒),超时则被释放(retired),缺省:10分钟 -->
        <property name="idleTimeout" value="600000" />
        <!-- 一个连接的生命时长(毫秒),超时而且没被使用则被释放(retired),缺省:30分钟,建议设置比数据库超时时长少30秒,参考MySQL 
            wait_timeout参数(show variables like '%timeout%';) -->
        <property name="maxLifetime" value="1800000" />
        <!-- 连接池中允许的最大连接数。缺省值:10;推荐的公式:((core_count * 2) + effective_spindle_count) -->
        <property name="maximumPoolSize" value="15" />
    </bean>
</beans>

ZookeeperCentralConfigurer .java

package com.dongnao.demo.config;

import java.util.List;
import java.util.Properties;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.web.context.ContextLoader;
import org.springframework.web.context.WebApplicationContext;

import com.zaxxer.hikari.HikariDataSource;

public class ZookeeperCentralConfigurer {
    private CuratorFramework zkClient;
    private TreeCache treeCache;

    private String zkServers;
    private String zkPath;
    private int sessionTimeout;
    private Properties props;

    public ZookeeperCentralConfigurer(String zkServers, String zkPath, int sessionTimeout) {
        this.zkServers = zkServers;
        this.zkPath = zkPath;
        this.sessionTimeout = sessionTimeout;
        this.props = new Properties();

        initZkClient();
        getConfigData();
        addZkListener();
    }

    private void initZkClient() {
        zkClient = CuratorFrameworkFactory.builder().connectString(zkServers).sessionTimeoutMs(sessionTimeout)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        zkClient.start();
    }

    private void getConfigData() {
        try {
            List<String> list = zkClient.getChildren().forPath(zkPath);
            for (String key : list) {
                String value = new String(zkClient.getData().forPath(zkPath + "/" + key));
                if (value != null && value.length() > 0) {
                    props.put(key, value);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void addZkListener() {
        TreeCacheListener listener = new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                if (event.getType() == TreeCacheEvent.Type.NODE_UPDATED) {
                    getConfigData();
                    WebApplicationContext ctx = ContextLoader.getCurrentWebApplicationContext();
                    HikariDataSource dataSource = (HikariDataSource) ctx.getBean("dataSource");
                    System.out.println("================"+props.getProperty("url"));
                    dataSource.setJdbcUrl(props.getProperty("url"));
                }
            }
        };

        treeCache = new TreeCache(zkClient, zkPath);
        try {
            treeCache.start();
            treeCache.getListenable().addListener(listener);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public Properties getProps() {
        return props;
    }

    public void setZkServers(String zkServers) {
        this.zkServers = zkServers;
    }

    public void setZkPath(String zkPath) {
        this.zkPath = zkPath;
    }

    public void setSessionTimeout(int sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }
}

ZookeeperPlaceholderConfigurer.java

package com.dongnao.demo.config;

import java.util.Properties;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer;

public class ZookeeperPlaceholderConfigurer extends PropertyPlaceholderConfigurer {
    private ZookeeperCentralConfigurer zkCentralConfigurer;

    @Override
    protected void processProperties(ConfigurableListableBeanFactory beanFactoryToProcess, Properties props)
            throws BeansException {
        super.processProperties(beanFactoryToProcess, zkCentralConfigurer.getProps());
    }

    public void setZkCentralConfigurer(ZookeeperCentralConfigurer zkCentralConfigurer) {
        this.zkCentralConfigurer = zkCentralConfigurer;
    }
}

Zk客户端命令 :

[zk: localhost:2181(CONNECTED) 2]create /jdbc db
Created /jdbc
[zk: localhost:2181(CONNECTED) 2] create /jdbc/url jdbc:mysql://localhost:3306/miaoshatest?useUnicode=true&characterEncoding=UTF-8
Created /jdbc/url
[zk: localhost:2181(CONNECTED) 3] create /jdbc/url jdbc.mysql://localhost:3306/miaoshatest?useUnicode=true&characterEncoding=UTF-8
Node already exists: /jdbc/url
[zk: localhost:2181(CONNECTED) 4] get /jdbc/url
jdbc:mysql://localhost:3306/miaoshatest?useUnicode=true&characterEncoding=UTF-8
cZxid = 0x3
ctime = Tue Mar 13 15:46:46 CST 2018
mZxid = 0x3
mtime = Tue Mar 13 15:46:46 CST 2018
pZxid = 0x3
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 79
numChildren = 0
[zk: localhost:2181(CONNECTED) 5] set /jdbc/url jdbc.mysql://localhost:3306/miaoshatest?useUnicode=true&characterEncoding=UTF-8
[zk: localhost:2181(CONNECTED) 11] create /jdbc/uname root
Created /jdbc/uname
[zk: localhost:2181(CONNECTED) 12]  create /jdbc/driver com.mysql.jdbc.Driver
Created /jdbc/driver
[zk: localhost:2181(CONNECTED) 13] create /jdbc/upass agui
Created /jdbc/upass

启动tomcat即可。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐