Springboot + kafka + ELK 日志存储 外网 流量 过大 iftop 流量分析
iftop 日志分析
·
Springboot + kafka + ELK 的部署和使用请另行查找。
这值记录一下 外网流量过大问题。
环境:
腾讯云 服务器
内存型M5 | 4核 32GB 5Mbps |
应用服务器:
kafka , ELK , 游戏服务器日志生成
问题:
由于这个时一个测试服务器,所以流量并没有开很大,5M 也是够用了。
在测试服务器中,4,5 个玩家正常的数据操作 会导致 网络很卡。
使用 腾讯云 服务器 流量监控 查看 ,显示 流量被占用完。
使用 iftop (如果 安装 请另行查找) 查看 时 logstash 和 kafka 相互之间的通讯(没有留下图)
修改 所有的 连接 ,使用 内网IP,localhost,127.0.0.1 都没有改变。
找到 不同点 ,最后发现 kafka 使用的是 docker 安装。ELK 和 游戏 使用的是原生安装。
使用 原生 安装 kafka 后,重启 所有服务,使用 iftop 查看 ,没有 logstash 和 kafka 使用外网流量
使用 java 调用 iftop 记录到日志中并分析
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
/**
* @author: laowang
* @date: 2022/2/11
* @description: 执行,并记录到日志中
*/
public class ExecIfTop {
private static Logger LOGGER = LoggerFactory.getLogger(ExecIfTop.class);
public void exec() throws IOException {
// 保存进程Id
String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
try (FileWriter writer = new FileWriter("server.pid")) {
writer.write(pid);
writer.flush();
}
new Thread(() -> {
try {
Process process = Runtime.getRuntime().exec("iftop -i eth0 -n -N -P -p -t -L 50");
new ReaderInfoThread(process.getInputStream()).start();
new ReaderErrorThread(process.getErrorStream()).start();
if (process.waitFor() != 0) {
LOGGER.info("cmd 执行失败:");
LOGGER.error("cmd 执行失败:");
}
} catch (Exception e) {
e.printStackTrace();
LOGGER.info("执行错误:", e);
LOGGER.error("执行错误:", e);
}
}).start();
}
public static class ReaderInfoThread extends Thread {
InputStream is;
public ReaderInfoThread(InputStream is) {
this.is = is;
}
@Override
public void run() {
try {
InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
BufferedReader br = new BufferedReader(isr);
String line = null;
while (true) {
line = br.readLine();
if (Objects.isNull(line)) {
break;
}
LOGGER.info(line);
System.out.println(line);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static class ReaderErrorThread extends Thread {
InputStream is;
public ReaderErrorThread(InputStream is) {
this.is = is;
}
@Override
public void run() {
try {
InputStreamReader isr = new InputStreamReader(this.is, StandardCharsets.UTF_8);
BufferedReader br = new BufferedReader(isr);
String line = null;
while (true) {
line = br.readLine();
if (Objects.isNull(line)) {
break;
}
LOGGER.error(line);
System.err.println(line);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
package com.wgq.iftop;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
/**
* @author: laowang
* @date: 2022/2/11
* @description: 分析日志信息
*/
public class AnalysisIfTop {
/** 日期 */
private static final DateTimeFormatter dateFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd");
/** 统计 一个 完整的数据中 当前时第几部分 中 */
private Count count = new Count();
private ConnectInfo connectInfo = null;
private HashMap<String, ConnectInfo> connectMap = new HashMap<>();
public void exec(String path) {
File file = new File(path);
if (!file.exists()) {
return;
}
List<File> files = Arrays.asList(Objects.requireNonNull(file.listFiles((File pathname) -> {
if (!pathname.isFile()) {
return false;
}
if (pathname.getName().startsWith("info")) {
return true;
}
return false;
})));
if (Objects.isNull(files) || files.isEmpty()) {
return;
}
this.count = new Count();
this.connectMap = new HashMap<>();
this.connectInfo = null;
files.sort(new FileNameComparator());
count.set(0);
for (var f : files) {
System.err.println(f.getName());
readFile(f);
}
saveToExcel(new ArrayList<>(this.connectMap.values()));
}
private void readFile(File file) {
try {
InputStreamReader isr = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8);
BufferedReader br = new BufferedReader(isr);
String line = null;
while (true) {
line = br.readLine();
if (Objects.isNull(line)) {
break;
}
analysis(line);
}
} catch (Exception e) {
e.printStackTrace();
}
}
// 分析数据
private void analysis(String line) {
line = line.substring(13);
if (line.indexOf("=======") > -1) {
doFinish();
} else if (line.indexOf("-----") > -1) {
doStart();
} else {
if (count.get() == 1) {
// 开始计算
if (line.indexOf("=>") > -1) {
// 出网
connectInfo = doOutput(line);
} else if (line.indexOf("<=") > -1) {
// 入网
connectInfo = doInput(line, connectInfo);
ConnectInfo old = connectMap.get(connectInfo.getKey());
if (Objects.isNull(old)) {
connectMap.put(connectInfo.getKey(), connectInfo);
} else {
old.merge(connectInfo);
}
connectInfo = null;
}
}
}
}
/** 出网 */
private ConnectInfo doOutput(String line) {
line = delMoreEmpty(line);
String[] values = line.split(" ");
ConnectInfo info = new ConnectInfo();
info.setOut(values[1], new FlowInfo(values[3], values[4], values[5], values[6]));
return info;
}
/** 入网 */
private ConnectInfo doInput(String line, ConnectInfo info) {
line = delMoreEmpty(line);
String[] values = line.split(" ");
info.setInput(values[0], new FlowInfo(values[2], values[3], values[4], values[5]));
return info;
}
/** 移除多余的 空格 */
private String delMoreEmpty(String line) {
char[] chars = line.toCharArray();
StringBuffer sb = new StringBuffer();
boolean isEmpty = true;
for (char c : chars) {
if (c == ' ') {
if (isEmpty) {
continue;
}
isEmpty = true;
} else {
isEmpty = false;
}
sb.append(c);
}
return sb.toString();
}
// 完整数据中,数据的拆分
private void doStart() {
count.add();
if (count.get() == 0) {
// 前面是数据的说明,现在开始统计数据
} else if (count.get() == 1) {
// 流量数据 逐条 完毕,开始总量统计
}
}
// 一个数据完毕
private void doFinish() {
count.set(0);
}
private CumulativeLevel getLevel(String value) {
if (value.indexOf("GB") > -1) {
value = value.replaceAll("GB", "").trim();
return new CumulativeLevel(Double.valueOf(value), 4);
} else if (value.indexOf("MB") > -1) {
value = value.replaceAll("MB", "").trim();
return new CumulativeLevel(Double.valueOf(value), 3);
} else if (value.indexOf("KB") > -1) {
value = value.replaceAll("KB", "").trim();
return new CumulativeLevel(Double.valueOf(value), 2);
} else if (value.indexOf("B") > -1) {
value = value.replaceAll("B", "").trim();
return new CumulativeLevel(Double.valueOf(value), 1);
}
return new CumulativeLevel(0, 0);
}
public int doCompare(String o1, String o2) {
CumulativeLevel l1 = getLevel(o1);
CumulativeLevel l2 = getLevel(o2);
if (l1.getL() != l2.getL()) {
return l2.getL() - l1.getL();
}
if (l1.getC() > l2.getC()) {
return -1;
} else if (l1.getC() < l2.getC()) {
return 1;
}
return 0;
}
private void saveToExcel(List<ConnectInfo> connectInfos) {
Workbook wb = new HSSFWorkbook();
// 输入排行榜
Sheet sheet = wb.createSheet("入网");
connectInfos.sort(new InputComparator());
createSheet(sheet, connectInfos, ConnectInfo::getInputCumulative);
// 输出排行榜
sheet = wb.createSheet("出网");
connectInfos.sort(new OutputComparator());
createSheet(sheet, connectInfos, ConnectInfo::getOutCumulative);
try {
wb.write(new FileOutputStream(new File("IfTop.xls")));
wb.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private void createSheet(Sheet sheet, List<ConnectInfo> connectInfos, Function<ConnectInfo, String> function) {
Row row = sheet.createRow(0);
row.createCell(0).setCellValue("IP:PORT");
row.createCell(1).setCellValue("IP:PORT");
row.createCell(2).setCellValue("总流量");
for (int i = 0; i < connectInfos.size(); i++) {
ConnectInfo info = connectInfos.get(i);
row = sheet.createRow(i + 1);
row.createCell(0).setCellValue(info.getValue1());
row.createCell(1).setCellValue(info.getValue2());
row.createCell(2).setCellValue(function.apply(info));
}
}
/** 链接信息 */
class ConnectInfo {
private String value1;
private String value2;
private FlowInfo input;
private FlowInfo out;
public String getKey() {
return value1 + " " + value2;
}
public void setInput(String value1, FlowInfo input) {
this.value1 = value1;
this.input = input;
}
public void setOut(String value2, FlowInfo out) {
this.out = out;
this.value2 = value2;
}
public void merge(ConnectInfo connectInfo) {
// 数值进行合并
this.input = connectInfo.input;
this.out = connectInfo.out;
}
public String getInputCumulative() {
return input.getCumulative();
}
public String getOutCumulative() {
return out.getCumulative();
}
public String getValue1() {
return value1;
}
public String getValue2() {
return value2;
}
@Override
public String toString() {
return "{" +
"input=" + input +
", out=" + out +
'}';
}
}
// 流量信息
class FlowInfo {
private String last2;
private String last10;
private String last40;
private String cumulative;
FlowInfo(String last2, String last10, String last40, String cumulative) {
this.last2 = last2;
this.last10 = last10;
this.last40 = last40;
this.cumulative = cumulative;
}
public String getLast2() {
return last2;
}
public String getLast10() {
return last10;
}
public String getLast40() {
return last40;
}
public String getCumulative() {
return cumulative;
}
@Override
public String toString() {
return "{" +
"last2='" + last2 + '\'' +
", last10='" + last10 + '\'' +
", last40='" + last40 + '\'' +
", cumulative='" + cumulative + '\'' +
'}';
}
}
/** 根据总流量排序 */
class InputComparator implements Comparator<ConnectInfo> {
@Override
public int compare(ConnectInfo o1, ConnectInfo o2) {
return doCompare(o1.getInputCumulative(), o2.getInputCumulative());
}
}
/** 根据总流量排序 */
class OutputComparator implements Comparator<ConnectInfo> {
@Override
public int compare(ConnectInfo o1, ConnectInfo o2) {
return doCompare(o1.getOutCumulative(), o2.getOutCumulative());
}
}
/** 流量和层级 */
class CumulativeLevel {
private double c;
private int l;
CumulativeLevel(double c, int l) {
this.c = c;
this.l = l;
}
public double getC() {
return c;
}
public int getL() {
return l;
}
}
/** 更具文件名字排序 */
class FileNameComparator implements Comparator<File> {
@Override
public int compare(File o1, File o2) {
if (o1.getName().equals("info.log")) {
return 1;
}
if (o2.getName().equals("info.log")) {
return -1;
}
// 根据 年月日排序
String name1 = o1.getName().replaceAll("info-", "").replaceAll(".log", "");
String name2 = o2.getName().replaceAll("info-", "").replaceAll(".log", "");
DateIndex v1 = new DateIndex(name1);
DateIndex v2 = new DateIndex(name2);
int dateCompare = v1.getDate().compareTo(v2.getDate());
if (dateCompare != 0) {
return dateCompare;
}
return v1.getIndex() - v2.getIndex();
}
}
/** 时间和编号 */
class DateIndex {
private LocalDate date;
private int index;
public DateIndex(String input) {
String[] values = input.split("[.]");
this.date = LocalDate.parse(values[0], dateFormat);
this.index = Integer.valueOf(values[1]);
}
public LocalDate getDate() {
return date;
}
public int getIndex() {
return index;
}
}
/** 计数 */
class Count {
private int count;
public int get() {
return count;
}
public void add() {
this.count++;
}
public void set(int v) {
this.count = v;
}
}
}
application.properties
game.logs.path=logs
logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<contextName>logback</contextName>
<springProperty scope="context" name="app_name" source="spring.application.name"/>
<springProperty scope="context" name="log.path" source="game.logs.path"/>
<!-- 控制台输出 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} %msg%n </pattern>
</encoder>
</appender>
<!-- 不带过滤器,能记录所有级别的日志 -->
<appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${log.path}/info.log</File>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 策略在每次往日志中添加新内容时触发,如果满足条件(每分钟对应一个日志文件),就将
info.log复制到${log.path}目录并更名为info-2017-11-22_13-15.1.log,并删除原info.log,
另一种生成新文件的条件是,info.log大小大于maxFileSize时,如果当前这一分钟已经有一个文件了,
则i加1。通常情况下,日志按天分割,如:${log.path}/info-%d{yyyyMMdd}.%i.log -->
<fileNamePattern>${log.path}/info-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>1MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<maxHistory>60</maxHistory>
<totalSizeCap>20GB</totalSizeCap>
</rollingPolicy>
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%d{HH:mm:ss.SSS} %msg%n </pattern>
</layout>
</appender>
<!-- error级别的文件输出 -->
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>ERROR</level>
</filter>
<File>${log.path}/error.log</File>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${log.path}/error-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>1MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<maxHistory>60</maxHistory>
<totalSizeCap>20GB</totalSizeCap>
</rollingPolicy>
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%d{HH:mm:ss.SSS} %msg%n </pattern>
</layout>
</appender>
<root level="info">
<appender-ref ref="STDOUT" />
<appender-ref ref="INFO_FILE" />
<appender-ref ref="ERROR_FILE" />
</root>
<springProfile name="dev">
<logger name="com" level="DEBUG" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
</springProfile>
<springProfile name="test,prod">
<logger name="com" level="INFO" additivity="false">
<appender-ref ref="INFO_FILE" />
<appender-ref ref="ERROR_FILE" />
</logger>
</springProfile>
</configuration>
pom.xml 引用 poi 生成 excel
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>5.2.0</version>
</dependency>
打包 生成 iftop-1.0.0.jar 运行
nohup /data/java/bin/java -jar -Xms512m -Xmx512m -server iftop-1.0.0.jar --spring.profiles.active=prod --spring.name=iftop 1>>logs/out.log 2>>logs/err.log & tail logs/out.log -f logs/err.log -f
更多推荐
已为社区贡献2条内容
所有评论(0)