1、原始数据保持到HBase数据库中,目的为后续离线分析做准备。解决方案的思路
(1)创建一个HBaseConsumer,作为Kafka的消费者
(2)从Kafka消费的数据保存到HBase中

2、启动服务
(1)启动zookeeper、kafka、flume
$ ./zkServer.sh start
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic
$ bin/flume-ng agent --conf conf --conf-file conf/flume-kafka-conf.properties --name agent1 -Dflume.root.logger=INFO,console

(2)启动dfs
$ start-dfs.sh
(3)启动hbase
$ start-hbase.sh

3、创建HBase表
创建表
create  'log_info'  , {NAME =>  'info' }

hbase(main):002:0>  create  'log_info' , {NAME =>  'info' }

ERROR: java.io.IOException: Table Namespace Manager not ready yet, try again later
        at org.apache.hadoop.hbase.master.HMaster.getNamespaceDescriptor(HMaster.java:3447)
        at org.apache.hadoop.hbase.master.HMaster.createTable(HMaster.java:1845)
        at org.apache.hadoop.hbase.master.HMaster.createTable(HMaster.java:2025)
        at org.apache.hadoop.hbase.protobuf.generated.MasterProtos$MasterService$2.callBlockingMethod(MasterProtos.java:42280)
        at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2107)
        at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:101)
        at org.apache.hadoop.hbase.ipc.FifoRpcScheduler$1.run(FifoRpcScheduler.java:74)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
     
在创建HBase表出现上述错误,下面分析上述错误
通过zkCli.sh 进行zookeeper,把hbase的hbase.rootdir路径里的文件都删掉就ok了

4、编写Kafka消费者保存Hbase
package com.yun.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.yun.hbase.HBaseUtils;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

/**
 * 从 Kafka中读取数据,然后保存到HBase中
 *
 * @author shenfl
 *
 */
public class StormKafkaToHBaseCustomer extends Thread {

    Pattern p = Pattern. compile("省公司鉴权接口url\\[(.*)]\\,响应时间\\[([0-9]+)\\],当前时间\\[([0-9]+)\\]" );
    private ConsumerConnector consumerConnector ;
    
    public StormKafkaToHBaseCustomer() {
         Properties props = new Properties();
          props.put( "zookeeper.connect", "192.168.2.20:2181" );
          // 设置consumer组
          props.put( "group.id", "jf-group" );
         ConsumerConfig config = new ConsumerConfig(props);
          this.consumerConnector = Consumer.createJavaConsumerConnector( config);
    }
    @Override
    public void run() {
         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
          topicCountMap.put("mytopic" , 1);//每次从topic专题中获取一条记录
         Map<String, List<KafkaStream< byte[], byte[]>>> createMessageStreams = consumerConnector
                 .createMessageStreams( topicCountMap);

         HBaseUtils hbase = new HBaseUtils();
          while (true ) {
              // 从kafka 的专题中获取信息
             KafkaStream< byte[], byte []> kafkaStream = createMessageStreams.get("mytopic" ).get(0);
             ConsumerIterator< byte[], byte []> iterator = kafkaStream .iterator();
              if (iterator .hasNext()) {
                 MessageAndMetadata< byte[], byte []> mm = iterator .next();
                 String v = new String(mm.message());
                 Matcher m = p.matcher( v);
                  if (m .find()) {
                      String url  = m.group(1);
                      String usetime = m.group(2);
                      String currentTime = m.group(3);
                     System. out.println(Thread.currentThread().getId()+ "=>"+url + "->" + usetime + "->" + currentTime );
                       //原始数据保持到HBase中,http://hn.auth.com->2000->1444274868019,rowkey为auth+日期
                       hbase.put( "log_info", "auth_"+currentTime , "info" ,"url" ,url );
                       hbase.put( "log_info", "auth_"+currentTime , "info" ,"usetime" ,usetime );
                       hbase.put( "log_info", "auth_"+currentTime , "info","currentTime" ,currentTime );
                 }
             }
         }
    
    }
    public static void main(String[] args) {
         StormKafkaToHBaseCustomer stormKafkaToHBaseCustomer = new StormKafkaToHBaseCustomer();
          stormKafkaToHBaseCustomer.start();
    }
}

5、验证HBase的数据
hbase(main):030:0> get  'log_info', 'auth_1444314527110'
COLUMN                          CELL                                                                                    
info:currentTime               timestamp=1444314527104, value=1444314527110                                            
info:url                             timestamp=1444314527087, value=http://hn.auth.com                                       
info:usetime                     timestamp=1444314527096, value=2000
当你看到这里,通过消费者程序就可以消费kafka中的数据,并把kafka中消费的数据保存到HBase中,方便后续的分析。


Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐