基于kafka+logback的异步持久化日志
最近一直在忙着攻克个人开源的用户中心中异步日志的功能,今天终于完成了这个困扰我许久的问题。服务器kafka如何配置当我们没有修改配置时,会遇到生产者连接服务器失败的情况advertised.host.name=47.94.248.38我们需要把host设置为当前服务器的iplogback配置<?xml version="1.0" encoding="U.
·
最近一直在忙着攻克个人开源的用户中心中异步日志的功能,今天终于完成了这个困扰我许久的问题。
服务器kafka如何配置
当我们没有修改配置时,会遇到生产者连接服务器失败的情况
advertised.host.name=47.94.248.38
我们需要把host设置为当前服务器的ip
logback配置
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="CONSOLE_LOG_PATTERN"
value="%date{yyyy-MM-dd HH:mm:ss} | %highlight(%-5level) | %boldGreen(%msg%n)"/>
<appender name="KAFKA" class="com.stalary.usercenter.config.KafkaAppender">
</appender>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="KAFKA"/>
<appender-ref ref="console"/>
</root>
// 关闭每次显示kafka设置信息
<logger name="org.apache.kafka" level="OFF"/>
</configuration>
kafka配置
首先我们需要设置一个formatter,即对消息的格式化
public interface Formatter {
String format(ILoggingEvent event);
}
实现类
public class MessageFormatter implements Formatter {
@Override
public String format(ILoggingEvent event) {
if (event.getFormattedMessage().startsWith(UCUtil.USER_LOG)) {
return event.getLevel().toString() + UCUtil.SPLIT + event.getFormattedMessage();
} else {
return null;
}
}
}
然后我们要实现一个KafkaAppender,即消息追加类
@Slf4j
public class KafkaAppender extends AppenderBase<ILoggingEvent> {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Resource
private Formatter formatter;
@Override
public void start() {
if (this.formatter == null) {
this.formatter = new MessageFormatter();
}
super.start();
}
@Override
public void stop() {
super.stop();
}
@Override
protected void append(ILoggingEvent event) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://47.94.248.38:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, String>(props));
String logStr = this.formatter.format(event);
if (logStr != null) {
kafkaTemplate.send(Consumer.LOG, logStr);
log.info("send message: topic: " + Consumer.LOG + " message: " + logStr);
}
}
}
这里遇到了一个坑啊,博主直接使用了自己写好的生产者不知道为什么无法发送消息,所以新申请一个KafkaTemplate
ILoggingEvent 则可以进行捕获日志。
下面来看一下消费者
@Slf4j
@Component
public class Consumer {
public static final String LOGIN_STAT = "login_stat";
public static final String LOG = "center_log";
@Resource
private Gson gson;
@Resource
private StatService statService;
@Resource
private LogService logService;
@KafkaListener(topics = {LOGIN_STAT, LOG})
public void process(ConsumerRecord record) {
long startTime = System.currentTimeMillis();
String topic = record.topic();
String key = "";
if (record.key() != null) {
key = record.key().toString();
}
String message = record.value().toString();
if (LOGIN_STAT.equals(topic)) {
UserStat userStat = gson.fromJson(message, UserStat.class);
statService.saveUserStat(userStat);
} else if (LOG.equals(topic)) {
String[] split = message.split(UCUtil.SPLIT);
String level = split[0];
String type = split[2];
Long commonId = Long.valueOf(split[3]);
String content = split[4];
// 异步存储日志
Log oldLog = logService.findOldLog(commonId, type, content);
if (oldLog != null) {
oldLog.setCount(oldLog.getCount() + 1);
logService.save(oldLog);
} else {
Log log = new Log(level, content, type, commonId, 1);
logService.save(log);
}
}
long endTime = System.currentTimeMillis();
log.info("SubmitConsumer.time=" + (endTime - startTime));
}
}
Log实体
@Data
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode(callSuper = true)
@Table(name = "log")
@Entity
public class Log extends BaseEntity {
/**
* 日志等级
*/
private String level;
/**
* 日志内容
*/
private String content;
/**
* 日志种类(user,project)
*/
private String type;
/**
* 通用id
*/
private Long commonId;
/**
* 次数
*/
private Integer count = 1;
}
以上就是完整的异步日志
大家觉得好用可以给我的github赏点星星哦~
你所能想到的,就能实现,技术是没有界限的
更多推荐
已为社区贡献1条内容
所有评论(0)