Storm、Kafka、Hbase 整合 java 例子
我的需求是从kafka里取数据数据,然后对数据进行加工,最后保存到HBase里。1.拓扑这里我的spout用的是storm-kafka-0.93.jar里的KafkaSpout类来作为输入源,我自己写的后边贴出来,不过我自己写的有问题,会把Topology跑死package test.kafka;import java.util.Arrays;import st
·
我的需求是从kafka里取数据数据,然后对数据进行加工,最后保存到HBase里。
1.拓扑
这里我的spout用的是storm-kafka-0.93.jar里的KafkaSpout类来作为输入源,我自己写的后边贴出来,不过我自己写的有问题,会把Topology跑死
package test.kafka;
import java.util.Arrays;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import test.SimpleBolt;
import test.SimpleBolt2;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
public class KafkaTopology {
public static void main(String[] args) {
try {
//实例化topologyBuilder类。
TopologyBuilder topologyBuilder = new TopologyBuilder();
//设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。
// String zks = "h1:2181,h2:2181,h3:2181";
// String topic = "my-replicated-topic5";
// String zkRoot = "/storm"; // default zookeeper root configuration for storm
// String id = "word";
//
// BrokerHosts brokerHosts = new ZkHosts(zks);
// SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
// spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
// spoutConf.forceFromStart = true;
// spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"});
// spoutConf.zkPort = 2181;
BrokerHosts brokerHosts = new ZkHosts("10.10.92.161:2181");
String topic = "TEST-TOPIC3";
// 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, "/storm", "jd-group5");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.zkServers = Arrays.asList(new String[] {"10.10.92.161"});
spoutConfig.zkPort = 2181;
// -2 从kafka头开始 -1 是从最新的开始 0 =无 从ZK开始
// spoutConfig.startOffsetTime = Integer.valueOf(StormAppConfigUtil.get(CommonConstant.KAFKA_CONF_OFFSETTIME));
// spoutConfig.forceFromStart = Boolean.valueOf(StormAppConfigUtil.get(CommonConstant.KAFKA_CONF_FORCESTART));
spoutConfig.forceFromStart = false;//这个是是否每次从头读,false为不从头读
KafkaSpout receiver = new KafkaSpout(spoutConfig);
topologyBuilder.setSpout("kafka-spout", receiver,5).setNumTasks(10);
// 设置数据处理节点,并分配并发数。指定该几点接收喷发节点的策略为随机方式。
topologyBuilder.setBolt("kafka-bolt", new SimpleBolt(),5).setNumTasks(10).shuffleGrouping("kafka-spout");
topologyBuilder.setBolt("kafka-hbase-bolt", new SimpleBolt2(),5).setNumTasks(10).shuffleGrouping("kafka-bolt");
Config config = new Config();
config.setDebug(false);
if (args != null && args.length > 0) {
/*设置该topology在storm集群中要抢占的资源slot数,一个slot对应这supervisor节点上的以个worker进程
如果你分配的spot数超过了你的物理节点所拥有的worker数目的话,有可能提交不成功,加入你的集群上面已经有了
一些topology而现在还剩下2个worker资源,如果你在代码里分配4个给你的topology的话,那么这个topology可以提交
但是提交以后你会发现并没有运行。 而当你kill掉一些topology后释放了一些slot后你的这个topology就会恢复正常运行。
*/
config.setNumWorkers(1);
StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
} else {
//这里是本地模式下运行的启动代码。
config.setNumWorkers(2);
config.setMaxTaskParallelism(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("simple", config,
topologyBuilder.createTopology());
// Thread.sleep(5000);
// cluster.killTopology("simple");
// cluster.shutdown();
}
} catch (Exception e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}
2.正常从kafka里取出数据进行处理的Bolt
package test;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
* Created by IntelliJ IDEA. User: comaple.zhang Date: 12-8-28 Time: 下午2:11 To
* change this template use File | Settings | File Templates.
*/
@SuppressWarnings("serial")
public class SimpleBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("info","id"));
}
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
try {
String mesg = input.getString(0);
if (mesg != null) {
collector.emit(new Values( mesg+"mesg is processed!",mesg));
// System.out.println("Bolt"+this.hashCode()+":"+mesg);
}
} catch (Exception e) {
e.printStackTrace(); // To change body of catch statement use File |
collector.fail(input); // Settings | File Templates.
}
collector.ack(input);
}
}
3.插入HBase里Bolt
package test;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import config.Config;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
/**
* Created by IntelliJ IDEA. User: comaple.zhang Date: 12-8-28 Time: 下午2:11 To
* change this template use File | Settings | File Templates.
*/
@SuppressWarnings("serial")
public class SimpleBolt2 extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
try {
String id = input.getStringByField("id");
String mesg = input.getStringByField("info");
if (mesg != null) {
Table table = Config.con.getTable(TableName.valueOf("xyz"));
Put put = new Put(id.getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值
put.addColumn("cf1".getBytes(), "val".getBytes(),
mesg.getBytes());// 本行数据的第一列
table.put(put);
}
} catch (Exception e) {
e.printStackTrace(); // To change body of catch statement use File |
collector.fail(input); // Settings | File Templates.
}
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
4.HBase的一个配置类
public class Config {
public static Configuration configuration;
public static Connection con;
static {
configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.set("hbase.zookeeper.quorum", "10.10.92.151");
try {
con = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}
}
最后把我自己写的Spout贴出来,我这个的问题是经常把Topology给跑死,我的做法是在每次转发后等待1毫秒,问题就不出现了,也不知道是为什么。还是对STORM跟KAFKA都不太熟。
package test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import testkafka.t3.KafkaProducer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class KafkaSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private static ConsumerConnector consumer;
/**
* 这里初始化collector
*
* @param conf
* @param context
* @param collector
*/
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
Properties props = new Properties();
// zookeeper 配置
props.put("zookeeper.connect", "10.10.92.161:2181");
// group 代表一个消费组
props.put("group.id", "jd-group5");
// zk连接超时
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
// 序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");
ConsumerConfig config = new ConsumerConfig(props);
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
}
/**
* 该方法会在SpoutTracker类中被调用每调用一次就可以向storm集群中发射一条数据(一个tuple元组) 该方法会被不停的调用
*/
@Override
public void nextTuple() {
try {
// String msg = info[rd.nextInt(10)];
// // 调用发射方法
// collector.emit(new Values(msg),rd.nextInt(100));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("TEST-TOPIC2", new Integer(1));
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
KafkaStream<String, String> stream = consumerMap.get("TEST-TOPIC2").get(0);
ConsumerIterator<String, String> it = stream.iterator();
System.out.println(123456);
while (it.hasNext()){
String message = it.next().message();
collector.emit(new Values(message));
}
// 模拟等待100ms
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 这里定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。 该declarer变量有很大作用,我们还可以调用
* declarer.declareStream(); 来定义stramId,该id可以用来定义 更加复杂的流拓扑结构
*
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("source"));
}
@Override
public void ack(Object msgId) {
System.out.println("任务执行完了:" + msgId);
}
@Override
public void fail(Object msgId) {
System.out.println("任务执行失败了:" + msgId);
}
}
程序结构写的不错特别好,只是个例子。
其实都是从网上找的例子,做了下整合与修改。
记录一下。
更多推荐
已为社区贡献3条内容
所有评论(0)