Springboot + kafka + ELK 的部署和使用请另行查找。

这值记录一下 外网流量过大问题。

环境:

腾讯云 服务器

内存型M5

4核 32GB 5Mbps

应用服务器:

kafka , ELK , 游戏服务器日志生成

问题:

由于这个时一个测试服务器,所以流量并没有开很大,5M 也是够用了。

在测试服务器中,4,5 个玩家正常的数据操作 会导致 网络很卡。

使用 腾讯云 服务器 流量监控 查看 ,显示 流量被占用完。

使用 iftop (如果 安装 请另行查找) 查看 时 logstash 和 kafka 相互之间的通讯(没有留下图)

修改 所有的 连接 ,使用 内网IP,localhost,127.0.0.1 都没有改变。

找到 不同点 ,最后发现 kafka 使用的是 docker 安装。ELK 和 游戏 使用的是原生安装。

使用 原生 安装 kafka 后,重启 所有服务,使用 iftop 查看 ,没有 logstash 和 kafka 使用外网流量

使用 java 调用 iftop 记录到日志中并分析


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

/**
 * @author: laowang
 * @date: 2022/2/11
 * @description: 执行,并记录到日志中
 */
public class ExecIfTop {
    private static Logger LOGGER = LoggerFactory.getLogger(ExecIfTop.class);

    public void exec() throws IOException {
        // 保存进程Id
        String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
        try (FileWriter writer = new FileWriter("server.pid")) {
            writer.write(pid);
            writer.flush();
        }
        new Thread(() -> {
            try {
                Process process = Runtime.getRuntime().exec("iftop -i eth0 -n -N -P -p -t -L 50");
                new ReaderInfoThread(process.getInputStream()).start();
                new ReaderErrorThread(process.getErrorStream()).start();
                if (process.waitFor() != 0) {
                    LOGGER.info("cmd 执行失败:");
                    LOGGER.error("cmd 执行失败:");
                }
            } catch (Exception e) {
                e.printStackTrace();
                LOGGER.info("执行错误:", e);
                LOGGER.error("执行错误:", e);
            }
        }).start();
    }

    public static class ReaderInfoThread extends Thread {
        InputStream is;

        public ReaderInfoThread(InputStream is) {
            this.is = is;
        }

        @Override
        public void run() {
            try {
                InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
                BufferedReader br = new BufferedReader(isr);
                String line = null;
                while (true) {
                    line = br.readLine();
                    if (Objects.isNull(line)) {
                        break;
                    }
                    LOGGER.info(line);
                    System.out.println(line);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static class ReaderErrorThread extends Thread {
        InputStream is;

        public ReaderErrorThread(InputStream is) {
            this.is = is;
        }

        @Override
        public void run() {
            try {
                InputStreamReader isr = new InputStreamReader(this.is, StandardCharsets.UTF_8);
                BufferedReader br = new BufferedReader(isr);
                String line = null;
                while (true) {
                    line = br.readLine();
                    if (Objects.isNull(line)) {
                        break;
                    }
                    LOGGER.error(line);
                    System.err.println(line);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
package com.wgq.iftop;

import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;

/**
 * @author: laowang
 * @date: 2022/2/11
 * @description: 分析日志信息
 */
public class AnalysisIfTop {
    /** 日期 */
    private static final DateTimeFormatter dateFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd");

    /** 统计 一个 完整的数据中 当前时第几部分 中 */
    private Count count = new Count();
    private ConnectInfo connectInfo = null;
    private HashMap<String, ConnectInfo> connectMap = new HashMap<>();

    public void exec(String path) {
        File file = new File(path);
        if (!file.exists()) {
            return;
        }
        List<File> files = Arrays.asList(Objects.requireNonNull(file.listFiles((File pathname) -> {
            if (!pathname.isFile()) {
                return false;
            }
            if (pathname.getName().startsWith("info")) {
                return true;
            }
            return false;
        })));
        if (Objects.isNull(files) || files.isEmpty()) {
            return;
        }
        this.count = new Count();
        this.connectMap = new HashMap<>();
        this.connectInfo = null;
        files.sort(new FileNameComparator());
        count.set(0);
        for (var f : files) {
            System.err.println(f.getName());
            readFile(f);
        }
        saveToExcel(new ArrayList<>(this.connectMap.values()));
    }


    private void readFile(File file) {
        try {
            InputStreamReader isr = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8);
            BufferedReader br = new BufferedReader(isr);
            String line = null;
            while (true) {
                line = br.readLine();
                if (Objects.isNull(line)) {
                    break;
                }
                analysis(line);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 分析数据
    private void analysis(String line) {
        line = line.substring(13);
        if (line.indexOf("=======") > -1) {
            doFinish();
        } else if (line.indexOf("-----") > -1) {
            doStart();
        } else {
            if (count.get() == 1) {
                // 开始计算
                if (line.indexOf("=>") > -1) {
                    // 出网
                    connectInfo = doOutput(line);
                } else if (line.indexOf("<=") > -1) {
                    // 入网
                    connectInfo = doInput(line, connectInfo);
                    ConnectInfo old = connectMap.get(connectInfo.getKey());
                    if (Objects.isNull(old)) {
                        connectMap.put(connectInfo.getKey(), connectInfo);
                    } else {
                        old.merge(connectInfo);
                    }
                    connectInfo = null;
                }
            }
        }
    }

    /** 出网 */
    private ConnectInfo doOutput(String line) {
        line = delMoreEmpty(line);
        String[] values = line.split(" ");
        ConnectInfo info = new ConnectInfo();
        info.setOut(values[1], new FlowInfo(values[3], values[4], values[5], values[6]));
        return info;
    }

    /** 入网 */
    private ConnectInfo doInput(String line, ConnectInfo info) {
        line = delMoreEmpty(line);
        String[] values = line.split(" ");
        info.setInput(values[0], new FlowInfo(values[2], values[3], values[4], values[5]));
        return info;
    }

    /** 移除多余的 空格 */
    private String delMoreEmpty(String line) {
        char[] chars = line.toCharArray();
        StringBuffer sb = new StringBuffer();
        boolean isEmpty = true;
        for (char c : chars) {
            if (c == ' ') {
                if (isEmpty) {
                    continue;
                }
                isEmpty = true;
            } else {
                isEmpty = false;
            }
            sb.append(c);
        }
        return sb.toString();
    }


    // 完整数据中,数据的拆分
    private void doStart() {
        count.add();
        if (count.get() == 0) {
            // 前面是数据的说明,现在开始统计数据
        } else if (count.get() == 1) {
            // 流量数据 逐条 完毕,开始总量统计
        }
    }

    // 一个数据完毕
    private void doFinish() {
        count.set(0);
    }

    private CumulativeLevel getLevel(String value) {

        if (value.indexOf("GB") > -1) {
            value = value.replaceAll("GB", "").trim();
            return new CumulativeLevel(Double.valueOf(value), 4);
        } else if (value.indexOf("MB") > -1) {
            value = value.replaceAll("MB", "").trim();
            return new CumulativeLevel(Double.valueOf(value), 3);
        } else if (value.indexOf("KB") > -1) {
            value = value.replaceAll("KB", "").trim();
            return new CumulativeLevel(Double.valueOf(value), 2);
        } else if (value.indexOf("B") > -1) {
            value = value.replaceAll("B", "").trim();
            return new CumulativeLevel(Double.valueOf(value), 1);
        }
        return new CumulativeLevel(0, 0);
    }

    public int doCompare(String o1, String o2) {
        CumulativeLevel l1 = getLevel(o1);
        CumulativeLevel l2 = getLevel(o2);
        if (l1.getL() != l2.getL()) {
            return l2.getL() - l1.getL();
        }
        if (l1.getC() > l2.getC()) {
            return -1;
        } else if (l1.getC() < l2.getC()) {
            return 1;
        }
        return 0;
    }

    private void saveToExcel(List<ConnectInfo> connectInfos) {
        Workbook wb = new HSSFWorkbook();

        // 输入排行榜
        Sheet sheet = wb.createSheet("入网");
        connectInfos.sort(new InputComparator());
        createSheet(sheet, connectInfos, ConnectInfo::getInputCumulative);

        // 输出排行榜
        sheet = wb.createSheet("出网");
        connectInfos.sort(new OutputComparator());
        createSheet(sheet, connectInfos, ConnectInfo::getOutCumulative);
        try {
            wb.write(new FileOutputStream(new File("IfTop.xls")));
            wb.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void createSheet(Sheet sheet, List<ConnectInfo> connectInfos, Function<ConnectInfo, String> function) {
        Row row = sheet.createRow(0);
        row.createCell(0).setCellValue("IP:PORT");
        row.createCell(1).setCellValue("IP:PORT");
        row.createCell(2).setCellValue("总流量");
        for (int i = 0; i < connectInfos.size(); i++) {
            ConnectInfo info = connectInfos.get(i);
            row = sheet.createRow(i + 1);
            row.createCell(0).setCellValue(info.getValue1());
            row.createCell(1).setCellValue(info.getValue2());
            row.createCell(2).setCellValue(function.apply(info));
        }

    }

    /** 链接信息 */
    class ConnectInfo {
        private String value1;
        private String value2;
        private FlowInfo input;
        private FlowInfo out;

        public String getKey() {
            return value1 + " " + value2;
        }

        public void setInput(String value1, FlowInfo input) {
            this.value1 = value1;
            this.input = input;
        }


        public void setOut(String value2, FlowInfo out) {
            this.out = out;
            this.value2 = value2;
        }

        public void merge(ConnectInfo connectInfo) {
            // 数值进行合并
            this.input = connectInfo.input;
            this.out = connectInfo.out;
        }

        public String getInputCumulative() {
            return input.getCumulative();
        }

        public String getOutCumulative() {
            return out.getCumulative();
        }

        public String getValue1() {
            return value1;
        }

        public String getValue2() {
            return value2;
        }

        @Override
        public String toString() {
            return "{" +
                    "input=" + input +
                    ", out=" + out +
                    '}';
        }
    }

    // 流量信息
    class FlowInfo {
        private String last2;
        private String last10;
        private String last40;
        private String cumulative;

        FlowInfo(String last2, String last10, String last40, String cumulative) {
            this.last2 = last2;
            this.last10 = last10;
            this.last40 = last40;
            this.cumulative = cumulative;
        }

        public String getLast2() {
            return last2;
        }

        public String getLast10() {
            return last10;
        }

        public String getLast40() {
            return last40;
        }

        public String getCumulative() {
            return cumulative;
        }

        @Override
        public String toString() {
            return "{" +
                    "last2='" + last2 + '\'' +
                    ", last10='" + last10 + '\'' +
                    ", last40='" + last40 + '\'' +
                    ", cumulative='" + cumulative + '\'' +
                    '}';
        }
    }

    /** 根据总流量排序 */
    class InputComparator implements Comparator<ConnectInfo> {

        @Override
        public int compare(ConnectInfo o1, ConnectInfo o2) {
            return doCompare(o1.getInputCumulative(), o2.getInputCumulative());
        }
    }

    /** 根据总流量排序 */
    class OutputComparator implements Comparator<ConnectInfo> {

        @Override
        public int compare(ConnectInfo o1, ConnectInfo o2) {
            return doCompare(o1.getOutCumulative(), o2.getOutCumulative());
        }
    }

    /** 流量和层级 */
    class CumulativeLevel {
        private double c;
        private int l;

        CumulativeLevel(double c, int l) {
            this.c = c;
            this.l = l;
        }

        public double getC() {
            return c;
        }

        public int getL() {
            return l;
        }
    }

    /** 更具文件名字排序 */
    class FileNameComparator implements Comparator<File> {

        @Override
        public int compare(File o1, File o2) {
            if (o1.getName().equals("info.log")) {
                return 1;
            }
            if (o2.getName().equals("info.log")) {
                return -1;
            }
            // 根据 年月日排序
            String name1 = o1.getName().replaceAll("info-", "").replaceAll(".log", "");
            String name2 = o2.getName().replaceAll("info-", "").replaceAll(".log", "");
            DateIndex v1 = new DateIndex(name1);
            DateIndex v2 = new DateIndex(name2);
            int dateCompare = v1.getDate().compareTo(v2.getDate());
            if (dateCompare != 0) {
                return dateCompare;
            }
            return v1.getIndex() - v2.getIndex();
        }
    }

    /** 时间和编号 */
    class DateIndex {
        private LocalDate date;
        private int index;

        public DateIndex(String input) {
            String[] values = input.split("[.]");
            this.date = LocalDate.parse(values[0], dateFormat);
            this.index = Integer.valueOf(values[1]);
        }

        public LocalDate getDate() {
            return date;
        }

        public int getIndex() {
            return index;
        }
    }

    /** 计数 */
    class Count {
        private int count;

        public int get() {
            return count;
        }

        public void add() {
            this.count++;
        }

        public void set(int v) {
            this.count = v;
        }
    }
}

application.properties

game.logs.path=logs

logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <contextName>logback</contextName>
    <springProperty scope="context" name="app_name" source="spring.application.name"/>
    <springProperty scope="context" name="log.path" source="game.logs.path"/>
    <!-- 控制台输出 -->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} %msg%n </pattern>
        </encoder>
    </appender>

    <!-- 不带过滤器,能记录所有级别的日志 -->
    <appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <File>${log.path}/info.log</File>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 策略在每次往日志中添加新内容时触发,如果满足条件(每分钟对应一个日志文件),就将
                info.log复制到${log.path}目录并更名为info-2017-11-22_13-15.1.log,并删除原info.log,
                另一种生成新文件的条件是,info.log大小大于maxFileSize时,如果当前这一分钟已经有一个文件了,
                则i加1。通常情况下,日志按天分割,如:${log.path}/info-%d{yyyyMMdd}.%i.log -->
            <fileNamePattern>${log.path}/info-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>1MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <maxHistory>60</maxHistory>
            <totalSizeCap>20GB</totalSizeCap>
        </rollingPolicy>
        <layout class="ch.qos.logback.classic.PatternLayout">
            <pattern>%d{HH:mm:ss.SSS} %msg%n </pattern>
        </layout>
    </appender>

    <!-- error级别的文件输出 -->
    <appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>ERROR</level>
        </filter>
        <File>${log.path}/error.log</File>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${log.path}/error-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>1MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <maxHistory>60</maxHistory>
            <totalSizeCap>20GB</totalSizeCap>
        </rollingPolicy>
        <layout class="ch.qos.logback.classic.PatternLayout">
            <pattern>%d{HH:mm:ss.SSS} %msg%n </pattern>
        </layout>
    </appender>
    <root level="info">
        <appender-ref ref="STDOUT" />
        <appender-ref ref="INFO_FILE" />
        <appender-ref ref="ERROR_FILE" />
    </root>

    <springProfile name="dev">
        <logger name="com" level="DEBUG" additivity="false">
            <appender-ref ref="STDOUT" />
        </logger>
    </springProfile>

    <springProfile name="test,prod">
        <logger name="com" level="INFO" additivity="false">
            <appender-ref ref="INFO_FILE" />
            <appender-ref ref="ERROR_FILE" />
        </logger>
    </springProfile>
</configuration>

pom.xml 引用 poi 生成 excel

<dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi-ooxml</artifactId>
            <version>5.2.0</version>
        </dependency>

打包 生成 iftop-1.0.0.jar 运行

nohup /data/java/bin/java -jar -Xms512m -Xmx512m -server iftop-1.0.0.jar --spring.profiles.active=prod --spring.name=iftop 1>>logs/out.log 2>>logs/err.log & tail logs/out.log -f logs/err.log -f

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐