storm-kafka数据读取问题
在storm的bolt中,接受kafka会出现数据读不到的问题:控制台不报错,但是有如上的提示,提醒有Fetched 31 messages from:和 Added 31 messages from:,但是并未对数据进行处理和接受。产生的原因是这样的,下面分两种情况讨论:1.bolt只接受一个spout:如:builder.setSpout("readlog", new KafkaSpo
·
在storm的bolt中,接受kafka会出现数据读不到的问题:
控制台不报错,但是有如上的提示,提醒有Fetched 31 messages from:和 Added 31 messages from:,但是并未对数据进行处理和接受。
产生的原因是这样的,下面分两种情况讨论:
- 1.bolt只接受一个spout:
如:
builder.setSpout("readlog", new KafkaSpout(dataConfig),1);
builder.setBolt("FPGrowth", new FPGrowthBolt(), 1).shuffleGrouping("readlog");
在FPGrowthBolt中无需画蛇添足地加上
if(input.getFields().contains("readlog"))
{
String line = (String)input.getValueByField("readlog");
}
否则就会读不到kafka传来的数据
所以可以直接写上
String line = input.getString(0);
- 2.bolt只接受两个spout:
如:
builder.setSpout("systemin", new KafkaSpout(inConfig),1);
builder.setBolt("PatternTree", new PatternTreeBolt(),1).shuffleGrouping("FPGrowth").shuffleGrouping("systemin");
对于PatternTreeBolt来说,接受了两个输入,FpGrowth和systemin,而systemin是由kafka传入数据的,对于Bolt来说,接受kafka传入的数据,与从spout和bolt接受数据有所不同,需要区别对待。
当写下如下代码时:
if (input.getFields().contains("FPGrowth"))
{
SecPackData mesg = (SecPackData) input.getValueByField("FPGrowth");
}
else if (input.getFields().contains("systemin")){
String str;
str = (String)input.getValueByField("systemin");
}
就会读取不到systemin通过kafka传来的数据,所以应改成如下代码:
if (input.getFields().contains("FPGrowth"))
{
SecPackData mesg = (SecPackData) input.getValueByField("FPGrowth");
}
else {
String str;
str = input.getString(0);
}
更多推荐
已为社区贡献5条内容
所有评论(0)