kafka线程池消费
阅读官网文档很有必要http://kafka.apache.org/documentation.html分区、Offset、消费线程、group.id的关系一组(类)消息通常由某个topic来归类,我们可以把这组消息“分发”给若干个分区(partition),每个分区的消息各不相同;每个分区都维护着他自己的偏移量(Offset),记录着该分区的消息此时被消费的位置;一个消费线程可以对应...
·
阅读官网文档很有必要
http://kafka.apache.org/documentation.html
分区、Offset、消费线程、group.id的关系
- 一组(类)消息通常由某个topic来归类,我们可以把这组消息“分发”给若干个分区(partition),每个分区的消息各不相同;
- 每个分区都维护着他自己的偏移量(Offset),记录着该分区的消息此时被消费的位置;
- 一个消费线程可以对应若干个分区,但一个分区只能被具体某一个消费线程消费;
- group.id用于标记某一个消费组,每一个消费组都会被记录他在某一个分区的Offset,即不同consumer group针对同一个分区,都有“各自”的偏移量。
1.2 Kafka的使用场景:
- 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
- 流式处理:比如spark streaming和storm
#kafka接收源报文
kafkaProp:
#测试环境zk
# zk: 10.11.24.21:2181,10.11.24.22:2181,10.11.24.23:2181
zk: 1.11.11.22:2181,1.11.11.22:2181,1.11.11.22:2181
#
groupId: account_cascade_client_11
timeOut: 3000
syncTime: 100
interval: 1000
topics:
- req.log.converted
@Data
@AllArgsConstructor
@NoArgsConstructor
@Configuration
@ConfigurationProperties(prefix = "kafkaProp")
public class KafkaProp {
private String zk;
private String groupId;
private String timeOut;
private String interval;
private String syncTime;
private List<String> topics;
}
@Component
public class casCadeMain implements CommandLineRunner {
@Autowired
private KafkaProp kafkaProp;
@Autowired
private CasCadeConsumer acceptConsumer;
@Autowired
private IHandler iHandler;
@Resource(name="tacheHandlerContainer")
private Map<String, AbStractTacheHandler> allHandlers;
@Override
public void run(String... args) throws Exception {
// init();
initConsumerMain();
}
private void initConsumerMain() {
ConsumerMain example = new ConsumerMain(iHandler,kafkaProp,allHandlers);
example.run(3);
}
}
ConsumerMain
public class ConsumerMain {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
private IHandler iHandler;
private Map<String, AbStractTacheHandler> allHandlers;
public ConsumerMain(IHandler iHandler, String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
this.iHandler = iHandler;
}
public ConsumerMain(IHandler iHandler, KafkaProp kafkaProp, Map<String, AbStractTacheHandler> allHandlers) {
String zooKeeper = kafkaProp.getZk();
String groupId = kafkaProp.getGroupId();
String topic = kafkaProp.getTopics().get(0);
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(zooKeeper, groupId));
this.topic = topic;
this.iHandler = iHandler;
this.allHandlers = allHandlers;
}
public void shutdown() {
if (consumer != null){
consumer.shutdown();
}
if (executor != null){
executor.shutdown();
}
}
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(a_numThreads);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerThread(iHandler,stream, allHandlers));
threadNumber++;
}
}
}
ConsumerThread线程类
package com.staryea.accountCascade.kafka;
import com.app.frame.util.logging.Log;
import com.app.frame.util.logging.LogFactory;
import com.staryea.accountCascade.core.CasCadeConsumer;
import com.staryea.accountCascade.inft.AbStractTacheHandler;
import com.staryea.accountCascade.inft.IHandler;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import java.util.Map;
public class ConsumerThread implements Runnable {
private final Log log = LogFactory.getLog(CasCadeConsumer.class);
private KafkaStream m_stream;
private Map<String, AbStractTacheHandler> allHandlers;
private IHandler iHandler;
public ConsumerThread(IHandler iHandler, KafkaStream a_stream, Map<String, AbStractTacheHandler> allHandlers) {
this.allHandlers = allHandlers;
this.m_stream = a_stream;
this.iHandler = iHandler;
}
@Override
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext()) {
String jsonItem = new String(it.next().message());
AbStractTacheHandler tacheHandler = iHandler.choose(jsonItem);
if(tacheHandler == null){
//log.warn("未解析到json处理器,json: " + jsonItem);
continue;
}
try {
tacheHandler.work(jsonItem);
} catch (Exception e) {
log.error(e.getMessage(),e);
e.printStackTrace();
}
}
}
}
更多推荐
已为社区贡献3条内容
所有评论(0)