我的需求是从kafka里取数据数据,然后对数据进行加工,最后保存到HBase里。


1.拓扑

这里我的spout用的是storm-kafka-0.93.jar里的KafkaSpout类来作为输入源,我自己写的后边贴出来,不过我自己写的有问题,会把Topology跑死


package test.kafka;

import java.util.Arrays;

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import test.SimpleBolt;
import test.SimpleBolt2;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;

public class KafkaTopology {
	public static void main(String[] args) {
        try {
            //实例化topologyBuilder类。
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            //设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。
//            String zks = "h1:2181,h2:2181,h3:2181";
//            String topic = "my-replicated-topic5";
//            String zkRoot = "/storm"; // default zookeeper root configuration for storm
//            String id = "word";
//              
//            BrokerHosts brokerHosts = new ZkHosts(zks);
//            SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
//            spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
//            spoutConf.forceFromStart = true;
//            spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"});
//            spoutConf.zkPort = 2181;
 
            BrokerHosts brokerHosts = new ZkHosts("10.10.92.161:2181");
            String topic = "TEST-TOPIC3";
    		// 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字
    		SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, "/storm", "jd-group5");
    		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    		spoutConfig.zkServers = Arrays.asList(new String[] {"10.10.92.161"});
    		spoutConfig.zkPort = 2181;
    		// -2 从kafka头开始  -1 是从最新的开始 0 =无 从ZK开始  
//    		spoutConfig.startOffsetTime = Integer.valueOf(StormAppConfigUtil.get(CommonConstant.KAFKA_CONF_OFFSETTIME));
//    		spoutConfig.forceFromStart = Boolean.valueOf(StormAppConfigUtil.get(CommonConstant.KAFKA_CONF_FORCESTART));
    		spoutConfig.forceFromStart = false;//这个是是否每次从头读,false为不从头读
    		KafkaSpout receiver = new KafkaSpout(spoutConfig);
            topologyBuilder.setSpout("kafka-spout", receiver,5).setNumTasks(10);
            // 设置数据处理节点,并分配并发数。指定该几点接收喷发节点的策略为随机方式。
            topologyBuilder.setBolt("kafka-bolt", new SimpleBolt(),5).setNumTasks(10).shuffleGrouping("kafka-spout");
            topologyBuilder.setBolt("kafka-hbase-bolt", new SimpleBolt2(),5).setNumTasks(10).shuffleGrouping("kafka-bolt");
            Config config = new Config();
            config.setDebug(false);
            if (args != null && args.length > 0) {
                /*设置该topology在storm集群中要抢占的资源slot数,一个slot对应这supervisor节点上的以个worker进程
                 如果你分配的spot数超过了你的物理节点所拥有的worker数目的话,有可能提交不成功,加入你的集群上面已经有了
                 一些topology而现在还剩下2个worker资源,如果你在代码里分配4个给你的topology的话,那么这个topology可以提交
                 但是提交以后你会发现并没有运行。 而当你kill掉一些topology后释放了一些slot后你的这个topology就会恢复正常运行。
                */
                config.setNumWorkers(1);
                StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
            } else {
                //这里是本地模式下运行的启动代码。
            	config.setNumWorkers(2);
                config.setMaxTaskParallelism(1);
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("simple", config,
                        topologyBuilder.createTopology());
//                Thread.sleep(5000);
//                cluster.killTopology("simple");
//                cluster.shutdown();
            }
        } catch (Exception e) {
            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
        }
    }
}


2.正常从kafka里取出数据进行处理的Bolt

package test;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * Created by IntelliJ IDEA. User: comaple.zhang Date: 12-8-28 Time: 下午2:11 To
 * change this template use File | Settings | File Templates.
 */
@SuppressWarnings("serial")
public class SimpleBolt extends BaseRichBolt {
	
	private OutputCollector collector;

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("info","id"));
	}

	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void execute(Tuple input) {
		try {
			String mesg = input.getString(0);
			if (mesg != null) {
				 collector.emit(new Values( mesg+"mesg is processed!",mesg));
//				 System.out.println("Bolt"+this.hashCode()+":"+mesg);
			}
		} catch (Exception e) {
			e.printStackTrace(); // To change body of catch statement use File |
			collector.fail(input);						// Settings | File Templates.
		}
		collector.ack(input);
	}
}

3.插入HBase里Bolt


package test;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;

import config.Config;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

/**
 * Created by IntelliJ IDEA. User: comaple.zhang Date: 12-8-28 Time: 下午2:11 To
 * change this template use File | Settings | File Templates.
 */
@SuppressWarnings("serial")
public class SimpleBolt2 extends BaseRichBolt {

	private OutputCollector collector;

	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void execute(Tuple input) {
		try {
			String id = input.getStringByField("id");
			String mesg = input.getStringByField("info");
			if (mesg != null) {
				Table table = Config.con.getTable(TableName.valueOf("xyz"));
				Put put = new Put(id.getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值
				put.addColumn("cf1".getBytes(), "val".getBytes(),
						mesg.getBytes());// 本行数据的第一列
				table.put(put);
			}

		} catch (Exception e) {
			e.printStackTrace(); // To change body of catch statement use File |
			collector.fail(input);					// Settings | File Templates.
		}
		collector.ack(input);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {

	}
}

4.HBase的一个配置类

public class Config {
	public static Configuration configuration;
	public static Connection con;
	static {
		configuration = HBaseConfiguration.create();
		configuration.set("hbase.zookeeper.property.clientPort", "2181");
		configuration.set("hbase.zookeeper.quorum", "10.10.92.151");
		try {
			con = ConnectionFactory.createConnection(configuration);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

最后把我自己写的Spout贴出来,我这个的问题是经常把Topology给跑死,我的做法是在每次转发后等待1毫秒,问题就不出现了,也不知道是为什么。还是对STORM跟KAFKA都不太熟。

package test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import testkafka.t3.KafkaProducer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class KafkaSpout extends BaseRichSpout {

	private SpoutOutputCollector collector;

	private static ConsumerConnector consumer;
	/**
	 * 这里初始化collector
	 * 
	 * @param conf
	 * @param context
	 * @param collector
	 */
	@Override
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		this.collector = collector;
		Properties props = new Properties();
		// zookeeper 配置
		props.put("zookeeper.connect", "10.10.92.161:2181");

		// group 代表一个消费组
		props.put("group.id", "jd-group5");

		// zk连接超时
		props.put("zookeeper.session.timeout.ms", "4000");
		props.put("zookeeper.sync.time.ms", "200");
		props.put("auto.commit.interval.ms", "1000");
		props.put("auto.offset.reset", "smallest");
		// 序列化类
		props.put("serializer.class", "kafka.serializer.StringEncoder");

		ConsumerConfig config = new ConsumerConfig(props);

		consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
	}

	/**
	 * 该方法会在SpoutTracker类中被调用每调用一次就可以向storm集群中发射一条数据(一个tuple元组) 该方法会被不停的调用
	 */
	@Override
	public void nextTuple() {
		try {
			// String msg = info[rd.nextInt(10)];
			// // 调用发射方法
			// collector.emit(new Values(msg),rd.nextInt(100));
			 Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		        topicCountMap.put("TEST-TOPIC2", new Integer(1));
		 
		        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
		        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
		 
		        Map<String, List<KafkaStream<String, String>>> consumerMap =
		                consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
		        KafkaStream<String, String> stream = consumerMap.get("TEST-TOPIC2").get(0);
		        ConsumerIterator<String, String> it = stream.iterator();
		        System.out.println(123456);
		        while (it.hasNext()){
		        	String message = it.next().message();
		        	collector.emit(new Values(message));
		        }
			// 模拟等待100ms
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 这里定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。 该declarer变量有很大作用,我们还可以调用
	 * declarer.declareStream(); 来定义stramId,该id可以用来定义 更加复杂的流拓扑结构
	 * 
	 * @param declarer
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("source"));
	}

	@Override
	public void ack(Object msgId) {
		System.out.println("任务执行完了:" + msgId);
	}

	@Override
	public void fail(Object msgId) {
		System.out.println("任务执行失败了:" + msgId);
	}
}




程序结构写的不错特别好,只是个例子。

其实都是从网上找的例子,做了下整合与修改。

记录一下。








Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐