Springboot配置kafka无服务正常启动
SpringBoot配置kafka服务,如果服务异常会导致系统无法正常启动,解决该问题可以应用Spring的@Conditional注解为Kafka的配置类添加条件注解,具体实现如下:import org.springframework.beans.factory.support.BeanDefinitionRegistry;import org.springframework.context.a
·
SpringBoot配置kafka服务,如果服务异常会导致系统无法正常启动,解决该问题可以应用Spring的@Conditional注解为Kafka的配置类添加条件注解,具体实现如下:
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotatedTypeMetadata;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
/**
* kafka 动态启动
* kafka代理服务器正常时启动kafka服务
* kafka代理服务器不可用时,不启动kafka服务
*/
public class MyCondition implements Condition {
private static Boolean a = null;
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
if (true) {
return false;
}
if (a != null && a.booleanValue()) {
return true;
} else if (a != null && !a.booleanValue()) {
return false;
}
//1、能获取到ioc使用的beanfactory
// ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
//2、获取类加载器
// ClassLoader classLoader = context.getClassLoader();
//3、获取当前环境信息
Environment environment = context.getEnvironment();
//4、获取到bean定义的注册类
BeanDefinitionRegistry registry = context.getRegistry();
// String property = environment.getProperty("os.name");
//此处也可以获取到配置文件的信息,比如获取配置文件中的端口
String ipPort = environment.getProperty("spring.kafka.bootstrap-servers");
URI uri = URI.create("http://" + ipPort);
String host = uri.getHost();
int port1 = uri.getPort();
boolean b = this.isHostConnectable(host, port1);
if (b) {
a = new Boolean(true);
} else {
a = new Boolean(false);
}
return b;
}
/**
* 判断kafka服务器,能否正常连接
*
* @param host
* @param port
* @return
*/
public boolean isHostConnectable(String host, int port) {
Socket socket = new Socket();
try {
//判断kafka网络是否能联通,不能连通则返回false
socket.connect(new InetSocketAddress(host, port), 2000);
} catch (IOException e) {
e.printStackTrace();
return false;
} finally {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return true;
}
}
第二步:在所有与kafka相关的配置类中添加注解,@Conditional(MyCondition.Class)
@Conditional保证在符合MyCondition中match方法时才能创建对应bean,只有验证服务可以访问通过的情况下,容器才能初始化Kafka相关类。
更多推荐
已为社区贡献3条内容
所有评论(0)