storm+kafka集成简单应用
这两天公司要学习kafka,结合之前的storm,做了一个简单的集成,之前也参考了网上的例子一些例子,发现或多或少都有一些问题。所以自己做了一个。 这个是网上其他人遇到的问题,给摘录一下,防止以后自己和大家出现:基本场景是应用出现错误,发送日志到kafka的某个topic,storm订阅该topic,然后进行后续处理。场景非常简单,但是在学习过程中
·
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
import
backtype.storm.Config;
import
backtype.storm.LocalCluster;
import
backtype.storm.StormSubmitter;
import
backtype.storm.spout.SchemeAsMultiScheme;
import
backtype.storm.topology.IBasicBolt;
import
backtype.storm.topology.TopologyBuilder;
import
backtype.storm.utils.Utils;
import
storm.kafka.BrokerHosts;
import
storm.kafka.KafkaSpout;
import
storm.kafka.SpoutConfig;
import
storm.kafka.ZkHosts;
import
storm.kafka.bolt.KafkaBolt;
import
java.util.Properties;
public
class
TopicMsgTopology {
public
static
void
main(String[] args)
throws
Exception {
// 配置Zookeeper地址
BrokerHosts brokerHosts =
new
ZkHosts(
"localhost:2181"
);
// 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字
SpoutConfig spoutConfig =
new
SpoutConfig(brokerHosts,
"msgTopic1"
,
"/topology/root1"
,
"topicMsgTopology"
);
// 配置KafkaBolt中的kafka.broker.properties
Config conf =
new
Config();
Properties props =
new
Properties();
// 配置Kafka broker地址
props.put(
"metadata.broker.list"
,
"localhost:9092"
);
// serializer.class为消息的序列化类
props.put(
"serializer.class"
,
"kafka.serializer.StringEncoder"
);
conf.put(
"kafka.broker.properties"
, props);
// 配置KafkaBolt生成的topic
conf.put(
"topic"
,
"msgTopic2"
);
spoutConfig.scheme =
new
SchemeAsMultiScheme(
new
MessageScheme());
TopologyBuilder builder =
new
TopologyBuilder();
builder.setSpout(
"msgKafkaSpout"
,
new
KafkaSpout(spoutConfig));
builder.setBolt(
"msgSentenceBolt"
, (IBasicBolt)
new
TopicMsgBolt()).shuffleGrouping(
"msgKafkaSpout"
);
builder.setBolt(
"msgKafkaBolt"
,
new
KafkaBolt<String, Integer>()).shuffleGrouping(
"msgSentenceBolt"
);
if
(args.length ==
0
) {
String topologyName =
"kafkaTopicTopology"
;
LocalCluster cluster =
new
LocalCluster();
cluster.submitTopology(topologyName, conf, builder.createTopology());
Utils.sleep(
100000
);
cluster.killTopology(topologyName);
cluster.shutdown();
}
else
{
conf.setNumWorkers(
1
);
StormSubmitter.submitTopology(args[
0
], conf, builder.createTopology());
}
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import
backtype.storm.spout.Scheme;
import
backtype.storm.tuple.Fields;
import
backtype.storm.tuple.Values;
import
org.slf4j.Logger;
import
org.slf4j.LoggerFactory;
import
java.io.UnsupportedEncodingException;
import
java.util.List;
public
class
MessageScheme
implements
Scheme {
private
static
final
Logger logger = LoggerFactory.getLogger(MessageScheme.
class
);
@Override
public
List<Object> deserialize(
byte
[] ser) {
try
{
String msg =
new
String(ser,
"UTF-8"
);
logger.info(
"get one message is {}"
, msg);
return
new
Values(msg);
}
catch
(UnsupportedEncodingException ignored) {
return
null
;
}
}
@Override
public
Fields getOutputFields() {
return
new
Fields(
"msg"
);
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import
backtype.storm.topology.BasicOutputCollector;
import
backtype.storm.topology.OutputFieldsDeclarer;
import
backtype.storm.topology.base.BaseBasicBolt;
import
backtype.storm.tuple.Fields;
import
backtype.storm.tuple.Tuple;
import
backtype.storm.tuple.Values;
import
org.slf4j.Logger;
import
org.slf4j.LoggerFactory;
public
class
TopicMsgBolt
extends
BaseBasicBolt {
private
static
final
Logger logger = LoggerFactory.getLogger(TopicMsgBolt.
class
);
@Override
public
void
execute(Tuple input, BasicOutputCollector collector) {
String word = (String) input.getValue(
0
);
String out =
"Message got is '"
+ word +
"'!"
;
logger.info(
"out={}"
, out);
collector.emit(
new
Values(out));
}
@Override
public
void
declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(
new
Fields(
"message"
));
}
}
|
1
2
|
# 创建topic./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic msgTopic1
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic msgTopic2
|
1
2
|
# 对msgTopic1启动producer,用于发送数据 ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic msgTopic1
# 对msgTopic2启动consumer,用于查看发送数据的处理结果 ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic msgTopic2 --from-beginning
|
更多推荐
已为社区贡献1条内容
所有评论(0)