Java操作Kafka(KafkaAdminClient Api)
AdminClient client = null;try {Properties properties = new Properties();properties.put("bootstrap.servers", nodeInfo.getHOST_IP());properties.put("connections.max.idle.ms", 10000);.
·
AdminClient client = null;
try {
Properties properties = new Properties();
properties.put("bootstrap.servers", nodeInfo.getHOST_IP());
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);
client = KafkaAdminClient.create(properties);
ListTopicsResult topics = client.listTopics();//获取所有主题列表
DescribeClusterResult iii = client.describeCluster(); //获取集群信息
Set<String> names = topics.names().get();
logger.info("connect to kafka cluster success");
} catch (InterruptedException e) {
// Kafka is not available
logger.info("======= ClusterNodeMonitorJob: kafka:" + nodeInfo.getHOST_IP() + "连接异常---" + e.getMessage());
String sysOwn = "kafka:" + nodeInfo.getHOST_IP();
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
java.util.Date now = new Date();
String opTime = sdf.format(now);
String smsContext = "【重要】" + opTime +"检测实时数据kafka服务器:" + "hostIpOfEspMain" + "节点状态异常 ,请审核";
SmsUtil.sendSms(opTime, sysOwn, smsContext);
logger.info(" ==================== 节点状态异常: " + smsContext);
} catch (ExecutionException e) {
// Kafka is not available
logger.info("======= ClusterNodeMonitorJob: kafka:" + nodeInfo.getHOST_IP() + "连接异常---" + e.getMessage());
String sysOwn = "kafka:" + nodeInfo.getHOST_IP();
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
java.util.Date now = new Date();
String opTime = sdf.format(now);
String smsContext = "【重要】" + opTime +"检测实时数据kafka服务器:" + "hostIpOfEspMain" + "节点状态异常 ,请审核";
SmsUtil.sendSms(opTime, sysOwn, smsContext);
logger.info(" ==================== 节点状态异常: " + smsContext);
} finally {
if (client != null) {
try {
client.close();
logger.info("======= ClusterNodeMonitorJob: kafka:" + nodeInfo.getHOST_IP() + "关闭正常");
} catch (Exception e) {
logger.info("======= ClusterNodeMonitorJob: kafka:" + nodeInfo.getHOST_IP() + "关闭异常:"+e.getMessage());
}
} else {
logger.info("======= ClusterNodeMonitorJob: kafka:" + nodeInfo.getHOST_IP() + "null异常");
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)