kafka学习四: flume+kafka+storm
转自:http://blog.csdn.net/desilting/article/details/23194039
转自:http://blog.csdn.net/desilting/article/details/23194039
配置flume:
http://blog.csdn.net/desilting/article/details/22811593
conf/flume-conf.properties文件:
- producer.sources = s
- producer.channels = c
- producer.sinks = r
- producer.sources.s.channels = c
- producer.sources.s.type= netcat
- producer.sources.s.bind= 192.168.40.134
- producer.sources.s.port= 44444
- producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
- producer.sinks.r.metadata.broker.list=192.168.40.134:9092
- producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
- producer.sinks.r.request.required.acks=1
- producer.sinks.r.max.message.size=1000000
- producer.sinks.r.custom.topic.name=mykafka
- producer.sinks.r.channel = c
- producer.channels.c.type = memory
- producer.channels.c.capacity = 1000
配置kafka:
http://blog.csdn.net/desilting/article/details/22872839
启动zookeeper、kafka及storm
创建topic:
bin/kafka-topics.sh --create --zookeeper 192.168.40.132:2181 --replication-factor 3 --partitions 1 --topic mykafka
查看topic:
bin/kafka-topics.sh --describe --zookeeper 192.168.40.132:2181
Topic:mykafka PartitionCount:1ReplicationFactor:3Configs:
Topic: mykafka Partition: 0Leader: 134Replicas: 133,134,132Isr: 134,133,132
partition | 同一个topic下可以设置多个partition,将topic下的message存储到不同的partition下,目的是为了提高并行性 |
leader | 负责此partition的读写操作,每个broker都有可能成为某partition的leader |
replicas | 副本,即此partition在哪几个broker上有备份,不管broker是否存活 |
isr | 存活的replicas |
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console
KafkaSink类:
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.util.Map;
- import java.util.Properties;
- import kafka.javaapi.producer.Producer;
- import kafka.producer.KeyedMessage;
- import kafka.producer.ProducerConfig;
- import org.apache.flume.Context;
- import org.apache.flume.Channel;
- import org.apache.flume.Event;
- import org.apache.flume.Transaction;
- import org.apache.flume.conf.Configurable;
- import org.apache.flume.sink.AbstractSink;
- import com.google.common.base.Preconditions;
- import com.google.common.collect.ImmutableMap;
- public class KafkaSink extends AbstractSink implements Configurable {
- private Context context;
- private Properties parameters;
- private Producer<String, String> producer;
- private static final String PARTITION_KEY_NAME = "custom.partition.key";
- private static final String CUSTOME_TOPIC_KEY_NAME = "custom.topic.name";
- private static final String DEFAULT_ENCODING = "UTF-8";
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSink.class);
- public void configure(Context context) {
- this.context = context;
- ImmutableMap<String, String> props = context.getParameters();
- this.parameters = new Properties();
- for (Map.Entry<String,String> entry : props.entrySet()) {
- this.parameters.put(entry.getKey(), entry.getValue());
- }
- }
- @Override
- public synchronized void start() {
- super.start();
- ProducerConfig config = new ProducerConfig(this.parameters);
- this.producer = new Producer<String, String>(config);
- }
- public Status process() {
- Status status = null;
- Channel channel = getChannel();
- Transaction transaction = channel.getTransaction();
- try {
- transaction.begin();
- Event event = channel.take();
- if (event != null) {
- String partitionKey = (String) parameters.get(PARTITION_KEY_NAME);
- String topic = Preconditions.checkNotNull((String) this.parameters.get(CUSTOME_TOPIC_KEY_NAME),
- "topic name is required");
- String eventData = new String(event.getBody(), DEFAULT_ENCODING);
- KeyedMessage<String, String> data = (partitionKey.isEmpty()) ? new KeyedMessage<String, String>(topic,
- eventData) : new KeyedMessage<String, String>(topic, partitionKey, eventData);
- LOGGER.info("Sending Message to Kafka : [" + topic + ":" + eventData + "]");
- producer.send(data);
- transaction.commit();
- LOGGER.info("Send message success");
- status = Status.READY;
- } else {
- transaction.rollback();
- status = Status.BACKOFF;
- }
- } catch (Exception e) {
- e.printStackTrace();
- LOGGER.info("Send message failed!");
- transaction.rollback();
- status = Status.BACKOFF;
- } finally {
- transaction.close();
- }
- return status;
- }
- @Override
- public void stop() {
- producer.close();
- }
- }
KafkaSpout参考其他人的代码:
https://github.com/HolmesNL/kafka-spout需要做一些修改。
storm测试程序:
- public class ExclamationTopology {
- public static class ExclamationBolt extends BaseRichBolt {
- OutputCollector _collector;
- transient CountMetric _countMetric;
- transient MultiCountMetric _wordCountMetric;
- transient ReducedMetric _wordLengthMeanMetric;
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- _collector = collector;
- initMetrics(context);
- }
- public void execute(Tuple tuple) {
- _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
- _collector.ack(tuple);
- updateMetrics(tuple.getString(0));
- }
- void updateMetrics(String word)
- {
- _countMetric.incr();
- _wordCountMetric.scope(word).incr();
- _wordLengthMeanMetric.update(word.length());
- }
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
- void initMetrics(TopologyContext context)
- {
- _countMetric = new CountMetric();
- _wordCountMetric = new MultiCountMetric();
- _wordLengthMeanMetric = new ReducedMetric(new MeanReducer());
- context.registerMetric("execute_count", _countMetric, 5);
- context.registerMetric("word_count", _wordCountMetric, 60);
- context.registerMetric("word_length", _wordLengthMeanMetric, 60);
- }
- }
- public static void main(String[] args) throws Exception {
- TopologyBuilder builder = new TopologyBuilder();
- String topic = args.length==2 ? args[1] : args[0];
- KafkaSpout kafkaSpout = new KafkaSpout(topic,"testKafkaGroup","192.168.40.132:2181");
- builder.setSpout("word", kafkaSpout, 10);
- builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
- builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
- Config conf = new Config();
- conf.setDebug(true);
- conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 2);
- if (args != null && args.length == 2) {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
- }
- else {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, builder.createTopology());
- Utils.sleep(10000);
- cluster.killTopology("test");
- cluster.shutdown();
- }
- }
- }
测试结果:
telnet 192.168.40.134 44444
在telnet端随便输入ASDF字符:
- ASDF
- OK
- ASD
- OK
- F
- OK
- ASDF
- OK
flume端显示:
- 2014-04-08 01:30:44,379 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:77)] Sending Message t] Kafka : [mykafka:ASDF
- 2014-04-08 01:30:44,387 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:80)] Send message success
- 2014-04-08 01:30:44,604 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:77)] Sending Message t] Kafka : [mykafka:ASD
- 2014-04-08 01:30:44,611 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:80)] Send message success
- 2014-04-08 01:30:44,794 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:77)] Sending Message t] Kafka : [mykafka:F
- 2014-04-08 01:30:44,799 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:80)] Send message success
- 2014-04-08 01:30:45,038 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:77)] Sending Message t] Kafka : [mykafka:ASDF
最终在storm的logs/metrics.log文件中,会找到这样的记录:
- 2014-04-08 01:30:28,446 495106 1396945828 ubun
更多推荐
所有评论(0)