目前的需求是将kafka管理的相关操作,如创建/删除/修改topic等集成到web 管理端, 但有个问题是执行的时候是打印到服务端标准输出的(控制台),没法返回给前端页面,于是,简单做了下修改,这样便完成了从标准输出获取信息,返回给前端。思路比较简单:

ByteArrayOutputStream bout = new ByteArrayOutputStream(); 

public Map<String, Object> topicOpt(@RequestParam Long clusterId,
			@RequestParam String cmd) {
		Cluster cluster = clusterService.findByClusterId(clusterId);
		String topicName = null;
		String msg = "";
		
		if (cluster!=null && cmd!=null) {
			String[] options = cmd.trim().replaceAll("\\s+", ",").split(",");
			for (int i=0; i<options.length; i++) {
				if (options[i].equals("--zookeeper")) {
					options[i+1] = cluster.getZkConnect();
				}
				if (options[i].equals("--topic")) {
					topicName = options[i+1];
				}
			}
			//调用管理命令创建topic
			PrintStream out = System.out;  		        
		        PrintStream ps = new PrintStream(bout);
		        System.setOut(ps);     // 这样做的原因是从标准输出流中获取命令执行的输出信息,返回给前端
		        TopicCommand.main(options);
			msg = new String(bout.toByteArray());
<span style="white-space:pre">			</span>bout.reset();
			System.setOut(out);		// 恢复现场	
			
			// 将元数据添加入数据库表中
			Topic topic = new Topic();
			topic.setClusterId(clusterId);
			topic.setName(topicName);
			topic.setRemarks(cmd);
			topicService.addTopic(topic);
		}
		return JsonResult.resultSuccess(msg);
	}


Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐