最近想把老storm集群上的部分任务,迁移到新搭建的storm集群上。

BrokerHosts hosts = new ZkHosts(Constants.ZOOKEEPER_HOSTS);
虽然都是从一个kafka集群取数,但新老storm集群对应的ZK地址不一样,迁移过去后消息消费没有续上。


老storm集群上运行的任务,在zk集群上的topic消费偏移量一直在更新:

[zk: localhost(CONNECTED) 21] get /ad_***@***/online-1/partition_9
{"topology":{"id":"c412eaff-42c4-4bc6-baf0-5fff4633be03","name":"ad_***"},"offset":1148171,"partition":9,"broker":{"host":"***","port":9092},"topic":"ad_***"}
cZxid = 0x1a9a2a6758
ctime = Wed Jun 1514:24:58CST 2016
mZxid = 0x1e001f6e2f
mtime = Tue Nov 1518:57:22CST 2016
pZxid = 0x1a9a2a6758
cversion = 0
dataVersion = 836004
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 204
numChildren = 0
[zk: localhost(CONNECTED) 22] get /ad_***@***/online-1/partition_9
{"topology":{"id":"c412eaff-42c4-4bc6-baf0-5fff4633be03","name":"ad_***"},"offset":1148177,"partition":9,"broker":{"host":"***","port":9092},"topic":"ad_***"}
cZxid = 0x1a9a2a6758
ctime = Wed Jun 1514:24:58CST 2016
mZxid = 0x1e00202046
mtime = Tue Nov 1518:58:38CST 2016
pZxid = 0x1a9a2a6758
cversion = 0
dataVersion = 836009
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 204
numChildren = 0

 

转移到新storm集群上的任务,在zk上的偏移量,一直没更新过:

[zk: localhost(CONNECTED) 29] get /***_log_hz@***/OnLine-2/partition_9
{"topology":{"id":"aeef3c11-4f58-4baa-9f25-cae3bf25e7d2","name":"***_log_hz@***"},"offset":4860271981,"partition":9,"broker":{"host":"10.***","port":9092},"topic":"***_log"}
cZxid = 0x1c0cc06551
ctime = Thu Oct 2011:30:10CST 2016
mZxid = 0x1c49d95a71
mtime = Wed Nov 0916:47:03CST 2016
pZxid = 0x1c0cc06551
cversion = 0
dataVersion = 513318
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 199
numChildren = 0

 

从kafkaspout源码中看到更新偏移量的函数:

publicvoidcommit() {
...
_state.writeJSON(committedPath(), data);
...
}


_state的初始化在:

Map stateConf = newHashMap(conf);
List<String> zkServers = _spoutConfig.zkServers;
if(zkServers == null) {
zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
}
Integer zkPort = _spoutConfig.zkPort;
if(zkPort == null) {
zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
}
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
_state = newZkState(stateConf);

 

而_spoutConfig.zkServers并没有被主动赋值过,因此取的是Config.STORM_ZOOKEEPER_SERVERS配置项,即新storm集群用的zk地址:storm.zookeeper.servers
在此zk集群上,没有创建storm任务的zk路径,偏移量也没有更新。

因此,在代码中将zkServers变量赋值为原来zookeeper的地址,看是否会更新偏移量:

#publicfinalstatic String STORM_ZOOKEEPER_SERVERS = "10.***0,10.***1,10.***2";
spoutConfig.zkServers = Lists.newArrayList(Constants.STORM_ZOOKEEPER_SERVERS.split(","));
KafkaSpout kafkaSpout = newKafkaSpout(spoutConfig);


为了防止生效后,kafkaspout读取到几天前的偏移量,下面将groupid修改成了OnLine-3,可以看到offset开始更新了:

[zk: localhost(CONNECTED) 35] get /***_log_hz@***/OnLine-3/partition_9
{"topology":{"id":"f67da6a9-d136-449e-b00c-ed14eefe6a9c","name":"***_log_hz@***"},"offset":5957447236,"partition":9,"broker":{"host":"10.***","port":9092},"topic":"***_log"}
cZxid = 0x1e0050ca16
ctime = Tue Nov 1520:30:32CST 2016
mZxid = 0x1e00516f84
mtime = Tue Nov 1520:31:43CST 2016
pZxid = 0x1e0050ca16
cversion = 0
dataVersion = 34
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 199
numChildren = 0
[zk: localhost(CONNECTED) 36] get /***_log_hz@***/OnLine-3/partition_9
{"topology":{"id":"f67da6a9-d136-449e-b00c-ed14eefe6a9c","name":"***_log_hz@***"},"offset":5957454675,"partition":9,"broker":{"host":"10.***","port":9092},"topic":"***_log"}
cZxid = 0x1e0050ca16
ctime = Tue Nov 1520:30:32CST 2016
mZxid = 0x1e00517378
mtime = Tue Nov 1520:31:45CST 2016
pZxid = 0x1e0050ca16
cversion = 0
dataVersion = 35
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 199
numChildren = 0


因此,在老storm集群上跑的任务,往新storm集群上迁移时,只要保证kafkaspout的zk路径不变,并设置好zk地址,就能平滑迁移。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐