整合Kafka+Storm,消息通过各种方式进入到Kafka消息中间件,比如通过使用Flume来收集的日志数据,然后暂由Kafka中的路由暂存,然后在由实时计算程序Storm做实时分析,这时候我们需要讲Storm中的Spout中读取Kafka中的消息,然后交由具体的Bolt组件分析处理。实际上在  apache-storm-0.9.3这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依赖配置。
1、配置Maven依赖包
<dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka_2.10</artifactId>
     <version>0.8.2.0</version>
     <exclusions>
               <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
               </exclusion>
     </exclusions>
</dependency>

<!-- kafka整合storm -->
<dependency>
     <groupId>org.apache.storm</groupId>
     <artifactId>storm-core</artifactId>
     <version>0.9.3</version>
     <scope>provided</scope>
     <exclusions>
          <exclusion>
               <groupId>org.slf4j</groupId>
               <artifactId>log4j-over-slf4j</artifactId>
          </exclusion>
          <exclusion>
               <groupId>org.slf4j</groupId>
               <artifactId>slf4j-api</artifactId>
          </exclusion>
     </exclusions>
</dependency>

<dependency>
     <groupId>org.apache.storm</groupId>
     <artifactId>storm-kafka</artifactId>
     <version>0.9.3</version>
</dependency>
storm程序能接收到数据,并进行处理,但是会发现数据被重复处理这是因为在bolt中没有对数据进行确认,需要调用ack或者fail方法, 修改完成之后即可。
2、编写Storm程序
package com.yun.storm;
import java.util.UUID;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

/**
* Storm读取Kafka消息中间件数据
*
* @author shenfl
*
*/
public class KafkaLogProcess {


     private static final String BOLT_ID = LogFilterBolt.class.getName();
     private static final String SPOUT_ID = KafkaSpout.class.getName();

     public static void main(String[] args) {
         
          TopologyBuilder builder = new TopologyBuilder();
          //表示kafka使用的zookeeper的地址
          String brokerZkStr = "192.168.2.20:2181";
          ZkHosts zkHosts = new ZkHosts(brokerZkStr);
          //表示的是kafak中存储数据的主题名称
          String topic = "mytopic";
          //指定zookeeper中的一个根目录,里面存储kafkaspout读取数据的位置等信息
          String zkRoot = "/kafkaspout";
          String id = UUID.randomUUID().toString();
          SpoutConfig spoutconf  = new SpoutConfig(zkHosts, topic, zkRoot, id);
          builder.setSpout(SPOUT_ID , new KafkaSpout(spoutconf));
          builder.setBolt(BOLT_ID,new  LogFilterBolt()).shuffleGrouping(SPOUT_ID);
         
          LocalCluster localCluster = new LocalCluster();
          localCluster.submitTopology(KafkaLogProcess.class.getSimpleName(), new Config(),builder.createTopology() );
     }
}
package com.yun.storm;

import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

/**
* 处理来自KafkaSpout的tuple,并保存到数据库中
*
* @author shenfl
*
*/
public class LogFilterBolt extends BaseRichBolt {

     private OutputCollector collector;
     /**
     *
     */
     private static final long serialVersionUID = 1L;

     Pattern p = Pattern.compile("省公司鉴权接口url\\[(.*)]\\,响应时间\\[([0-9]+)\\],当前时间\\[([0-9]+)\\]");

     /**
     * 每个LogFilterBolt实例仅初始化一次
     */
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
          this.collector = collector;
     }

     @Override
     public void execute(Tuple input) {
          try {
               // 接收KafkaSpout的数据
               byte[] bytes = input.getBinaryByField("bytes");
               String value = new String(bytes).replaceAll("[\n\r]", "");
               // 解析数据并入库
               Matcher m = p.matcher(value);
               if (m.find()) {
                    String url = m.group(1);
                    String usetime = m.group(2);
                    String currentTime = m.group(3);
                    System.out.println(url + "->" + usetime + "->" + currentTime);
               }
               this.collector.ack(input);
          } catch (Exception e) {
               this.collector.fail(input);
          }
     }

     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
     }
}


3、解析日志入库
3.1 引入Maven依赖包
<!-- mysql maven相关依赖 -->
<dependency>
     <groupId>commons-dbutils</groupId>
     <artifactId>commons-dbutils</artifactId>
     <version>1.6</version>
</dependency>
<dependency>
     <groupId>mysql</groupId>
     <artifactId>mysql-connector-java</artifactId>
     <version>5.1.29</version>
</dependency>

3.2 编写MyDbUtils工具类
(1)创建数据表
create database jfyun;

CREATE TABLE `log_info` (
   `id` int(10) NOT NULL AUTO_INCREMENT,
   `topdomain` varchar(100) COLLATE latin1_german1_ci DEFAULT NULL,
   `usetime` varchar(10) COLLATE latin1_german1_ci DEFAULT NULL,
   `time` datetime DEFAULT NULL,
   PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1803 DEFAULT CHARSET=latin1 COLLATE=latin1_german1_ci

(2)MyDbUtils的程序
package com.yun.storm.util;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.apache.commons.dbutils.BasicRowProcessor;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.ArrayListHandler;

public class MyDbUtils {
    
     private static String className = "com.mysql.jdbc.Driver";
     private static String url = "jdbc:mysql://192.168.2.20:3306/jfyun?useUnicode=true&characterEncoding=utf-8";
     private static String user = "root";
     private static String password = "123";
     private static QueryRunner queryRunner = new QueryRunner();

     public static final String INSERT_LOG = "insert into log_info(topdomain,usetime,time) values(?,?,?)";

     static{
          try {
               Class.forName(className);
          } catch (ClassNotFoundException e) {
               e.printStackTrace();
          }
     }
     public static void main(String[] args) throws Exception {
          String topdomain = "taobao.com";
          String usetime = "100";
          String currentTime="1444218216106";
          MyDbUtils.update(MyDbUtils.INSERT_LOG, topdomain,usetime,currentTime);
          update(INSERT_LOG,topdomain,usetime,MyDateUtils.formatDate2(new Date(Long.parseLong(currentTime))));
     }
     /**
     * @param conn
     * @throws SQLException
     */
     public static void update(String sql,Object... params) throws SQLException {
          Connection connection = getConnection();
          //更新数据
          queryRunner.update(connection,sql, params);
          connection.close();
     }
    
     public static List<String> executeQuerySql(String sql) {
         
          List<String> result = new ArrayList<String>();
          try {
               List<Object[]> requstList = queryRunner.query(getConnection(), sql,
                         new ArrayListHandler(new BasicRowProcessor() {
                              @Override
                              public <Object> List<Object> toBeanList(ResultSet rs,
                                        Class<Object> type) throws SQLException {
                                   return super.toBeanList(rs, type);
                              }
                         }));
               for (Object[] objects : requstList) {
                    result.add(objects[0].toString());
               }
          } catch (SQLException e) {
               e.printStackTrace();
          }
          return result;
     }
     /**
     * @throws SQLException
     *
     */
     public static Connection getConnection() throws SQLException {
          //获取mysql连接
          return DriverManager.getConnection(url, user, password);
     }
}

(3)修改storm程序
if (m.find()) {
     url = m.group(1);
     usetime = m.group(2);
     currentTime = m.group(3);
     System.out.println(url + "->" + usetime + "->" + currentTime);

     MyDbUtils.update(MyDbUtils.INSERT_LOG, url, usetime,
               MyDateUtils.formatDate2(new Date(Long.parseLong(currentTime))));
}

(4)统计指标
--统计每个url平均响应时间
SELECT
  topdomain,
  ROUND(AVG(usetime) / 1000, 2) avg_use_time
FROM
  log_info
GROUP BY topdomain;

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐