(一)问题描述:
今天跑了一个storm job,job的目的是向redis写入数据,job可以正常运行,但是奇怪的是运行大约3分钟左右就停了,没有报任何的异常……

(二)问题症结(以下为简化代码):

    @Override
    public void execute(Tuple input) {
       KafkaEvent kafkaEvent = (KafkaEvent) input.getValueByField("kafkaEvent");
       Jedis redis = LoadRedisConfig.getJedis();
       String key=...;//简化
       String value=...;
       redis.hincrBy(key, value, 1l);
    }

其中LoadRedisConfig为

public class LoadRedisConfig {
    private static Logger logger = LoggerFactory.getLogger(LoadRedisConfig.class);

    private static Jedis jedis=null;
    private static JedisPool jedisPool = null;


     private static int MAX_ACTIVE = 1024;   
     private static int MAX_IDLE = 200;
     private static int MAX_WAIT = 10000;
     private static int TIMEOUT = 10000;
     private static boolean TEST_ON_BORROW = true;

     static{ 
        initRedisPool("**.***.**.**", 6379, null);
     }

     /**
      * 
      * @param host
      * @param port
      * @return
      */
    public static Jedis getJedis(String host,Integer port) {
        jedis=new Jedis(host,port);
        return  jedis;
    }


    /**
     * 获取JedisPoolConfig
     * @return
     */
    public static JedisPoolConfig getJedisPoolConfig(){
          JedisPoolConfig config = new JedisPoolConfig();
          config.setMaxActive(MAX_ACTIVE);
          config.setMaxIdle(MAX_IDLE);
          config.setMaxWait(MAX_WAIT);
          config.setTestOnBorrow(TEST_ON_BORROW);
          return config;
    }



    /**
     * 初始化JedisPool
     * @param host
     * @param port
     * @param password
     * @return
     */
    public static JedisPool initRedisPool(String host,Integer port,String password){
        JedisPoolConfig config=getJedisPoolConfig();
        try{
            jedisPool = new JedisPool(config, host, port, TIMEOUT, password);
        }
        catch(Exception e){
           if(logger.isInfoEnabled()){
               logger.info("jedis pool 初始化连接异常",e.getMessage());
           }    
        }
        return jedisPool;
    }

    /**
     * 获取Jedis实例
     * @return
     */
     public synchronized static Jedis getJedis() {
         try {
             if (jedisPool != null) {
                 Jedis resource = jedisPool.getResource();
                 return resource;
             } else {
                 return null;
            }
         } catch (Exception e) {
             e.printStackTrace();
             return null;
         }
     }

}

(三)问题分析:

(1)通过查找相关的文档发现,刚开始以为是maxclient问题,我查看自己的conf配置发现:

方法1:

cat redis.conf

这里写图片描述

方法2:

config  get *

maxclient解析:

  1. 设置同一时间最大客户端连接数,默认无限制, Redis 可以同时打开的客户端连接数为 Redis 进程可以打开的最大文件描述符数;

  2. 如果设置 maxclients 0 ,表示不作限制;

  3. 当客户端连接数到达限制时, Redis 会关闭新的连接并向客户端返回 max number of clients reached 错误信息。

(2)真正的问题症结:
经过仔细的分析,问题在于我使用了redis的连接池,设置了MAX_ACTIVE为1024,呵呵了….在这里先简要的解释一下:

           JedisPoolConfig config = new JedisPoolConfig();
           //如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态exhausted(耗尽)。
          config.setMaxActive(MAX_ACTIVE);
           //控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
          config.setMaxIdle(MAX_IDLE);
          //最大等待时间:单位ms
          config.setMaxWait(MAX_WAIT);
          //在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的
          config.setTestOnBorrow(TEST_ON_BORROW);

那么问题就找到了:*我在excute中每次都拿到一个redis实例,当实例超过1024之后拿到的redis对象就为null*!

(四)解决问题

(1)解决方法1:将获取redis连接放在prepare中

private Jedis redis;
public void prepare(Map conf, TopologyContext context, OutputCollector collector){
  redis = LoadRedisConfig.getJedis();
}

同时在cleanup()方法中断开连接

    @Override
    public void cleanup() {
        if (redis != null && redis.isConnected()) {
            redis.disconnect();
        }
    } 

(2)解决方法2:将redis实例返还到连接池

     /**
      * 返还到连接池
      * 
      * @param pool 
      * @param redis
      */
     public static void returnResource(JedisPool pool, Jedis redis) {
         if (redis != null) {
             pool.returnResource(redis);
         }
     }

推荐两种方法结合使用!


(五)附录:JedisPool常用参数配置

  • maxActive:
    控制一个pool可分配多少个jedis实例;如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态exhausted(耗尽)。
config.setMaxActive(MAX_ACTIVE);
  • maxIdle:
    控制一个pool最多有多少个状态为idle(空闲)的jedis实例;
config.setMaxIdle(MAX_IDLE);
  • minIdle:
    至少有多少个状态为idle(空闲)的jedis实例;默认为0;
config.setMinIdle(minIdle);
  • maxWait:
    表示当borrow一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;
config.setMaxWait(MAX_WAIT);
  • testOnBorrow:
    在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
config.setTestOnBorrow(true);
  • testWhileIdle:
    如果为true,表示有一个idle object evitor线程对idle object进行扫描,如果validate失败,此object会被从pool中drop掉;这一项只有在timeBetweenEvictionRunsMillis大于0时才有意义;默认为false;
config.setTestWhileIdle(false);
  • timeBetweenEvictionRunsMillis:
    表示idle object evitor两次扫描之间要sleep的毫秒数;
config.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
  • whenExhaustedAction:
    控制一个pool最多有多少个状态为idle(空闲)的jedis实例;

whenExhaustedAction:表示当pool中的jedis实例都被allocated完时,pool要采取的操作;默认有三种。
WHEN_EXHAUSTED_FAIL –> 表示无jedis实例时,直接抛出NoSuchElementException;
WHEN_EXHAUSTED_BLOCK –> 则表示阻塞住,或者达到maxWait时抛出JedisConnectionException;
WHEN_EXHAUSTED_GROW –> 则表示新建一个jedis实例,也就说设置的maxActive无用;

config.setWhenExhaustedAction(whenExhaustedAction);
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐