SpringBoot配置kafka服务,如果服务异常会导致系统无法正常启动,解决该问题可以应用Spring的@Conditional注解为Kafka的配置类添加条件注解,具体实现如下:

import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotatedTypeMetadata;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;

/**
 * kafka 动态启动
 * kafka代理服务器正常时启动kafka服务
 * kafka代理服务器不可用时,不启动kafka服务
 */
public class MyCondition implements Condition {
    private static Boolean a = null;
    @Override
    public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
        if (true) {
            return false;
        }
        if (a != null && a.booleanValue()) {
            return true;
        } else if (a != null && !a.booleanValue()) {
            return false;
        }
        //1、能获取到ioc使用的beanfactory
//        ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
        //2、获取类加载器
//        ClassLoader classLoader = context.getClassLoader();
        //3、获取当前环境信息
        Environment environment = context.getEnvironment();
        //4、获取到bean定义的注册类
        BeanDefinitionRegistry registry = context.getRegistry();
//        String property = environment.getProperty("os.name");
        //此处也可以获取到配置文件的信息,比如获取配置文件中的端口
        String ipPort = environment.getProperty("spring.kafka.bootstrap-servers");
        URI uri = URI.create("http://" + ipPort);
        String host = uri.getHost();
        int port1 = uri.getPort();
        boolean b = this.isHostConnectable(host, port1);
        if (b) {
            a = new Boolean(true);
        } else {
            a = new Boolean(false);
        }
        return b;
    }

    /**
     * 判断kafka服务器,能否正常连接
     *
     * @param host
     * @param port
     * @return
     */
    public boolean isHostConnectable(String host, int port) {
        Socket socket = new Socket();
        try {
            //判断kafka网络是否能联通,不能连通则返回false
            socket.connect(new InetSocketAddress(host, port), 2000);
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        } finally {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return true;
    }
}

第二步:在所有与kafka相关的配置类中添加注解,@Conditional(MyCondition.Class)

@Conditional保证在符合MyCondition中match方法时才能创建对应bean,只有验证服务可以访问通过的情况下,容器才能初始化Kafka相关类。

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐