105-storm 整合 kafka之保存HBase数据库
通过flume-ng收集的日志,最后通过kafka作为中间件,最后通过storm可以实时消费。为方便后续的原数据分析,通过保持hbase进行备份
·
1、原始数据保持到HBase数据库中,目的为后续离线分析做准备。解决方案的思路
(1)创建一个HBaseConsumer,作为Kafka的消费者
(2)从Kafka消费的数据保存到HBase中
2、启动服务
(1)启动zookeeper、kafka、flume
$ ./zkServer.sh start
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic
$ bin/flume-ng agent --conf conf --conf-file conf/flume-kafka-conf.properties --name agent1 -Dflume.root.logger=INFO,console
(2)启动dfs
$ start-dfs.sh
(3)启动hbase
$ start-hbase.sh
3、创建HBase表
创建表
create
'log_info'
, {NAME =>
'info'
}
hbase(main):002:0>
create
'log_info'
, {NAME =>
'info'
}
ERROR: java.io.IOException: Table Namespace Manager not ready yet, try again later
at org.apache.hadoop.hbase.master.HMaster.getNamespaceDescriptor(HMaster.java:3447)
at org.apache.hadoop.hbase.master.HMaster.createTable(HMaster.java:1845)
at org.apache.hadoop.hbase.master.HMaster.createTable(HMaster.java:2025)
at org.apache.hadoop.hbase.protobuf.generated.MasterProtos$MasterService$2.callBlockingMethod(MasterProtos.java:42280)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2107)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:101)
at org.apache.hadoop.hbase.ipc.FifoRpcScheduler$1.run(FifoRpcScheduler.java:74)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
在创建HBase表出现上述错误,下面分析上述错误
通过zkCli.sh 进行zookeeper,把hbase的hbase.rootdir路径里的文件都删掉就ok了
4、编写Kafka消费者保存Hbase
package com.yun.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.yun.hbase.HBaseUtils;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
/**
* 从 Kafka中读取数据,然后保存到HBase中
*
* @author shenfl
*
*/
public class StormKafkaToHBaseCustomer extends Thread {
Pattern p = Pattern. compile("省公司鉴权接口url\\[(.*)]\\,响应时间\\[([0-9]+)\\],当前时间\\[([0-9]+)\\]" );
private ConsumerConnector consumerConnector ;
public StormKafkaToHBaseCustomer() {
Properties props = new Properties();
props.put( "zookeeper.connect", "192.168.2.20:2181" );
// 设置consumer组
props.put( "group.id", "jf-group" );
ConsumerConfig config = new ConsumerConfig(props);
this.consumerConnector = Consumer.createJavaConsumerConnector( config);
}
@Override
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("mytopic" , 1);//每次从topic专题中获取一条记录
Map<String, List<KafkaStream< byte[], byte[]>>> createMessageStreams = consumerConnector
.createMessageStreams( topicCountMap);
HBaseUtils hbase = new HBaseUtils();
while (true ) {
// 从kafka 的专题中获取信息
KafkaStream< byte[], byte []> kafkaStream = createMessageStreams.get("mytopic" ).get(0);
ConsumerIterator< byte[], byte []> iterator = kafkaStream .iterator();
if (iterator .hasNext()) {
MessageAndMetadata< byte[], byte []> mm = iterator .next();
String v = new String(mm.message());
Matcher m = p.matcher( v);
if (m .find()) {
String url = m.group(1);
String usetime = m.group(2);
String currentTime = m.group(3);
System. out.println(Thread.currentThread().getId()+ "=>"+url + "->" + usetime + "->" + currentTime );
//原始数据保持到HBase中,http://hn.auth.com->2000->1444274868019,rowkey为auth+日期
hbase.put( "log_info", "auth_"+currentTime , "info" ,"url" ,url );
hbase.put( "log_info", "auth_"+currentTime , "info" ,"usetime" ,usetime );
hbase.put( "log_info", "auth_"+currentTime , "info","currentTime" ,currentTime );
}
}
}
}
public static void main(String[] args) {
StormKafkaToHBaseCustomer stormKafkaToHBaseCustomer = new StormKafkaToHBaseCustomer();
stormKafkaToHBaseCustomer.start();
}
}
5、验证HBase的数据
hbase(main):030:0> get 'log_info', 'auth_1444314527110'
COLUMN CELL
info:currentTime timestamp=1444314527104, value=1444314527110
info:url timestamp=1444314527087, value=http://hn.auth.com
info:usetime timestamp=1444314527096, value=2000
当你看到这里,通过消费者程序就可以消费kafka中的数据,并把kafka中消费的数据保存到HBase中,方便后续的分析。
更多推荐
已为社区贡献3条内容
所有评论(0)