最近一直在忙着攻克个人开源的用户中心中异步日志的功能,今天终于完成了这个困扰我许久的问题。

服务器kafka如何配置

当我们没有修改配置时,会遇到生产者连接服务器失败的情况

advertised.host.name=47.94.248.38

我们需要把host设置为当前服务器的ip

logback配置

<?xml version="1.0" encoding="UTF-8"?>
<configuration>

    <property name="CONSOLE_LOG_PATTERN"
              value="%date{yyyy-MM-dd HH:mm:ss} | %highlight(%-5level) | %boldGreen(%msg%n)"/>
    <appender name="KAFKA" class="com.stalary.usercenter.config.KafkaAppender">
    </appender>

    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
        </encoder>
    </appender>
    <root level="info">
        <appender-ref ref="KAFKA"/>
        <appender-ref ref="console"/>
    </root>
    // 关闭每次显示kafka设置信息
    <logger name="org.apache.kafka" level="OFF"/>
</configuration>

kafka配置

首先我们需要设置一个formatter,即对消息的格式化

public interface Formatter {

    String format(ILoggingEvent event);
}

实现类

public class MessageFormatter implements Formatter {
    @Override
    public String format(ILoggingEvent event) {
        if (event.getFormattedMessage().startsWith(UCUtil.USER_LOG)) {
            return event.getLevel().toString() + UCUtil.SPLIT + event.getFormattedMessage();
        } else {
            return null;
        }
    }
}

然后我们要实现一个KafkaAppender,即消息追加类

@Slf4j
public class KafkaAppender extends AppenderBase<ILoggingEvent> {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @Resource
    private Formatter formatter;

    @Override
    public void start() {
        if (this.formatter == null) {
            this.formatter = new MessageFormatter();
        }
        super.start();
    }

    @Override
    public void stop() {
        super.stop();
    }

    @Override
    protected void append(ILoggingEvent event) {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://47.94.248.38:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, String>(props));
        String logStr = this.formatter.format(event);
        if (logStr != null) {
            kafkaTemplate.send(Consumer.LOG, logStr);
            log.info("send message: topic: " + Consumer.LOG + " message: " + logStr);
        }
    }
}

这里遇到了一个坑啊,博主直接使用了自己写好的生产者不知道为什么无法发送消息,所以新申请一个KafkaTemplate

ILoggingEvent 则可以进行捕获日志。

下面来看一下消费者

@Slf4j
@Component
public class Consumer {

    public static final String LOGIN_STAT = "login_stat";

    public static final String LOG = "center_log";

    @Resource
    private Gson gson;

    @Resource
    private StatService statService;

    @Resource
    private LogService logService;

    @KafkaListener(topics = {LOGIN_STAT, LOG})
    public void process(ConsumerRecord record) {
        long startTime = System.currentTimeMillis();
        String topic = record.topic();
        String key = "";
        if (record.key() != null) {
            key = record.key().toString();
        }
        String message = record.value().toString();
        if (LOGIN_STAT.equals(topic)) {
            UserStat userStat = gson.fromJson(message, UserStat.class);
            statService.saveUserStat(userStat);
        } else if (LOG.equals(topic)) {
            String[] split = message.split(UCUtil.SPLIT);
            String level = split[0];
            String type = split[2];
            Long commonId = Long.valueOf(split[3]);
            String content = split[4];
            // 异步存储日志
            Log oldLog = logService.findOldLog(commonId, type, content);
            if (oldLog != null) {
                oldLog.setCount(oldLog.getCount() + 1);
                logService.save(oldLog);
            } else {
                Log log = new Log(level, content, type, commonId, 1);
                logService.save(log);
            }
        }
        long endTime = System.currentTimeMillis();
        log.info("SubmitConsumer.time=" + (endTime - startTime));
    }
}

Log实体

@Data
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode(callSuper = true)
@Table(name = "log")
@Entity
public class Log extends BaseEntity {

    /**
     * 日志等级
     */
    private String level;

    /**
     * 日志内容
     */
    private String content;

    /**
     * 日志种类(user,project)
     */
    private String type;

    /**
     * 通用id
     */
    private Long commonId;

    /**
     * 次数
     */
    private Integer count = 1;
}

以上就是完整的异步日志

用户中心地址

大家觉得好用可以给我的github赏点星星哦~

你所能想到的,就能实现,技术是没有界限的

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐