104-storm 整合 kafka之保存MySQL数据库
整合Kafka+Storm,消息通过各种方式进入到Kafka消息中间件,比如通过使用Flume来收集的日志数据,然后暂由Kafka中的路由暂存,然后在由实时计算程序Storm做实时分析,这时候我们需要讲Storm中的Spout中读取Kafka中的消息,然后交由具体的Bolt组件分析处理。实际上在 apache-storm-0.9.3这个版本的Storm已经自带了一个集成Kafka的外部插件程序st
·
整合Kafka+Storm,消息通过各种方式进入到Kafka消息中间件,比如通过使用Flume来收集的日志数据,然后暂由Kafka中的路由暂存,然后在由实时计算程序Storm做实时分析,这时候我们需要讲Storm中的Spout中读取Kafka中的消息,然后交由具体的Bolt组件分析处理。实际上在
apache-storm-0.9.3这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依赖配置。
1、配置Maven依赖包
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- kafka整合storm -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.3</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.3</version>
</dependency>
storm程序能接收到数据,并进行处理,但是会发现数据被重复处理这是因为在bolt中没有对数据进行确认,需要调用ack或者fail方法, 修改完成之后即可。
2、编写Storm程序
package com.yun.storm;
import java.util.UUID;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
/**
* Storm读取Kafka消息中间件数据
*
* @author shenfl
*
*/
public class KafkaLogProcess {
private static final String BOLT_ID = LogFilterBolt.class.getName();
private static final String SPOUT_ID = KafkaSpout.class.getName();
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
//表示kafka使用的zookeeper的地址
String brokerZkStr = "192.168.2.20:2181";
ZkHosts zkHosts = new ZkHosts(brokerZkStr);
//表示的是kafak中存储数据的主题名称
String topic = "mytopic";
//指定zookeeper中的一个根目录,里面存储kafkaspout读取数据的位置等信息
String zkRoot = "/kafkaspout";
String id = UUID.randomUUID().toString();
SpoutConfig spoutconf = new SpoutConfig(zkHosts, topic, zkRoot, id);
builder.setSpout(SPOUT_ID , new KafkaSpout(spoutconf));
builder.setBolt(BOLT_ID,new LogFilterBolt()).shuffleGrouping(SPOUT_ID);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(KafkaLogProcess.class.getSimpleName(), new Config(),builder.createTopology() );
}
}
package com.yun.storm;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
/**
* 处理来自KafkaSpout的tuple,并保存到数据库中
*
* @author shenfl
*
*/
public class LogFilterBolt extends BaseRichBolt {
private OutputCollector collector;
/**
*
*/
private static final long serialVersionUID = 1L;
Pattern p = Pattern.compile("省公司鉴权接口url\\[(.*)]\\,响应时间\\[([0-9]+)\\],当前时间\\[([0-9]+)\\]");
/**
* 每个LogFilterBolt实例仅初始化一次
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
try {
// 接收KafkaSpout的数据
byte[] bytes = input.getBinaryByField("bytes");
String value = new String(bytes).replaceAll("[\n\r]", "");
// 解析数据并入库
Matcher m = p.matcher(value);
if (m.find()) {
String url = m.group(1);
String usetime = m.group(2);
String currentTime = m.group(3);
System.out.println(url + "->" + usetime + "->" + currentTime);
}
this.collector.ack(input);
} catch (Exception e) {
this.collector.fail(input);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
3、解析日志入库
3.1 引入Maven依赖包
<!-- mysql maven相关依赖 -->
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.29</version>
</dependency>
3.2 编写MyDbUtils工具类
(1)创建数据表
create database jfyun;
CREATE TABLE `log_info` (
`id` int(10) NOT NULL AUTO_INCREMENT,
`topdomain` varchar(100) COLLATE latin1_german1_ci DEFAULT NULL,
`usetime` varchar(10) COLLATE latin1_german1_ci DEFAULT NULL,
`time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1803 DEFAULT CHARSET=latin1 COLLATE=latin1_german1_ci
(2)MyDbUtils的程序
package com.yun.storm.util;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.commons.dbutils.BasicRowProcessor;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.ArrayListHandler;
public class MyDbUtils {
private static String className = "com.mysql.jdbc.Driver";
private static String url = "jdbc:mysql://192.168.2.20:3306/jfyun?useUnicode=true&characterEncoding=utf-8";
private static String user = "root";
private static String password = "123";
private static QueryRunner queryRunner = new QueryRunner();
public static final String INSERT_LOG = "insert into log_info(topdomain,usetime,time) values(?,?,?)";
static{
try {
Class.forName(className);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
String topdomain = "taobao.com";
String usetime = "100";
String currentTime="1444218216106";
MyDbUtils.update(MyDbUtils.INSERT_LOG, topdomain,usetime,currentTime);
update(INSERT_LOG,topdomain,usetime,MyDateUtils.formatDate2(new Date(Long.parseLong(currentTime))));
}
/**
* @param conn
* @throws SQLException
*/
public static void update(String sql,Object... params) throws SQLException {
Connection connection = getConnection();
//更新数据
queryRunner.update(connection,sql, params);
connection.close();
}
public static List<String> executeQuerySql(String sql) {
List<String> result = new ArrayList<String>();
try {
List<Object[]> requstList = queryRunner.query(getConnection(), sql,
new ArrayListHandler(new BasicRowProcessor() {
@Override
public <Object> List<Object> toBeanList(ResultSet rs,
Class<Object> type) throws SQLException {
return super.toBeanList(rs, type);
}
}));
for (Object[] objects : requstList) {
result.add(objects[0].toString());
}
} catch (SQLException e) {
e.printStackTrace();
}
return result;
}
/**
* @throws SQLException
*
*/
public static Connection getConnection() throws SQLException {
//获取mysql连接
return DriverManager.getConnection(url, user, password);
}
}
(3)修改storm程序
if (m.find()) {
url = m.group(1);
usetime = m.group(2);
currentTime = m.group(3);
System.out.println(url + "->" + usetime + "->" + currentTime);
MyDbUtils.update(MyDbUtils.INSERT_LOG, url, usetime,
MyDateUtils.formatDate2(new Date(Long.parseLong(currentTime))));
}
(4)统计指标
--统计每个url平均响应时间
SELECT
topdomain,
ROUND(AVG(usetime) / 1000, 2) avg_use_time
FROM
log_info
GROUP BY topdomain;
更多推荐
已为社区贡献3条内容
所有评论(0)