14.5 storm从kafka接收数据然后写入kafka
package storm.starter.chenbo; import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.task.ShellBolt;import backtype.stor...
package storm.starter.chenbo;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.spout.MultiScheme;
import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import storm.kafka.bolt.selector.DefaultTopicSelector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Properties;
/**
* This topology demonstrates Storm's stream groupings and multilang
* capabilities.
*/
public class LogFilterTopology {
public static class FilterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String line = tuple.getString(0);
if (line.contains("ERROR")) {
System.err.println(line);
collector.emit(new Values(line));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message")); // 这个地方写message是给后面FieldNameBasedTupleToKafkaMapper来用
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
// config kafka spout 发送数据的topic
String topic = "mylog";
ZkHosts zkHosts = new ZkHosts("192.168.188.4:4180,192.168.188.5:4180,192.168.188.6:4180");
SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", // 偏移量offset的根目录
"MyTrack");// 对应一个应用
List<String> zkServers = new ArrayList<String>();
System.out.println(zkHosts.brokerZkStr);
for (String host : zkHosts.brokerZkStr.split(",")) {
zkServers.add(host.split(":")[0]);
}
spoutConfig.zkServers = zkServers;
spoutConfig.zkPort = 4180;
spoutConfig.forceFromStart = false; // 从头开始消费
spoutConfig.socketTimeoutMs = 60 * 1000;
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // 定义输出为String
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
// set kafka spout
builder.setSpout("kafka_spout", kafkaSpout, 3);
// set bolt
builder.setBolt("filter", new FilterBolt(), 8).shuffleGrouping("kafka_spout");
// set kafka bolt 接收数据的kafka的topic:mylog_ERROR
KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("mylog_ERROR"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
builder.setBolt("kafka_bolt", kafka_bolt, 2).shuffleGrouping("filter");
Config conf = new Config();
// set producer properties.
Properties props = new Properties();
props.put("metadata.broker.list", "192.168.188.4:9092,192.168.188.5:9092,192.168.188.6:9092");
props.put("request.required.acks", "1"); // 0 1 -1
props.put("serializer.class", "kafka.serializer.StringEncoder");
conf.put("kafka.broker.properties", props);
conf.setNumWorkers(4);
// StormSubmitter.submitTopologyWithProgressBar("logfilter", conf,
// builder.createTopology());
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("logfilter", conf, builder.createTopology());
}
}
更多推荐
所有评论(0)