整个8月没写一篇技术文章,特地趁着空闲整理出点东西

应用场景:在netty通讯服务里个整个任务,当设备一连上服务器,就可以通过定时器向设备发送从其他消息队列接收到的消息(类如kafka等),但要保证对目前的netty性能不产生大影响。

幸好之前有创建过netty搭建的IM通讯服务,所以拿他当成自己的demo项目(如何没看过可以查看我前面的IM通讯文章)

实现要点如下

一.改造ChannelInboundHandlerAdapter(渠道绑定拦截器适配器)内容

1.主要方法有以下几个

//channel添加方法

handlerAdded(ChannelHandlerContext ctx)

//channel移除时

handlerRemoved(ChannelHandlerContext context)

//channel活跃时

channelActive(ChannelHandlerContext ctx)

//channel接收到信息时

channelRead(ChannelHandlerContext ctx, Object msg)

//channel异常时

exceptionCaught(ChannelHandlerContext ctx,Throwable cause)

其中handlerAdded先执行,channelActive后执行,handlerRemoved在失去连接时执行,channel异常时

二实现业务场景

1.创建类继承ChannelInboundHandlerAdapter,创建一个静态组

public class ServerHandlers extends ChannelInboundHandlerAdapter {
private static ChannelGroup channels =new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
...
}

2.在handlerAdded方法将channel加入channels

//加入ChannelGroup
if(加入条件){
    channels.add(ctx.channel());
}
log.info(ctx.channel().id()+"的设备上线"+"Online:"+channels.size());

3.将定时任务Task放在channelActive方法里

       //启动调度任务
        Task task =new Task(ctx.channel());
        //判断条件
        if(channels.contains(ctx.channel())){
            ctx.channel().eventLoop().scheduleAtFixedRate(task, 1,3L, TimeUnit.SECONDS);
        }

定时任务暂时如下:

  

public class Task implements Runnable {

    @Override
    public void run() {
        System.out.printf("任务开始\n");
        try {
            TimeUnit.SECONDS.sleep(2);
            channel.writeAndFlush("服务器端推送");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("任务结束\n");
    }
   public Task(){

   }
   public Task(Channel channel){
       this.channel  =channel;
   }
}

结果如下:

三.额外补充

注意:1.查看很多文章说放在redis里比较合适,但channelId其实是无法实例化的(可能用的不是ID但是复杂),所以采用了静                 态组概念

          2.Netty 限制每个Channel 都由一个 Thread 处理,这种设计适用于非阻塞 IO 操作,但是NETTY本身是支持多线程

          3.异常处理方法可以exceptionCaught方法处理,代码如下

@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause)throws Exception{
  //移除静态组  
    channels.remove(ctx.channel());
//关闭调度任务
    ctx.channel().eventLoop().shutdownGracefully();
  log.info(ctx.channel().id()+"的设备出错"+"Online:"+channels.size());
//关闭链接
  ctx.close();
}

 可以 在handlerRemoved()方法也添加此方法   

         4.channel的ID实例化过程是随机值,注意1一点提到无法实例化,这里的判断条件就很难根据ID限定,但我们要根据实际业务变通,类如我的业务在设备链接时,需要发送心跳包检查的,发送的信息里由包含设备参数ID的,这样我可以将channels的add方法放在channelRead,根据传递msg将适合参数的ID的添加。这样定时任务会触发在channelRead方法之后,一样实现了业务

四 jmeter测试Netty长连接服务

1创建线程组

2.创建TCP取样器,保持长连接(也可以大小断言,通过断言结果对断言进行采样)

3创建固定定时器

4.创建聚合报告

测试开始!

而运行段后出现TCP链接掉线情况,很多都是由于超时(2S)

检查原因,发现Netty客户端所读取的信息如下

出现由于线程过多出现信息发送粘连问题,这里暂时没深入研究,下次可以有机会仔细研究

信息粘连的最佳解决方案:

原理

 

channel之前的message实际上是ByteBuf,我们可以规定以某些结尾(也可以自定义),这里直接使用base(以分隔符“\r\n”为分割的),最大是1024字节,这里是测试结果

修改后客户端输入:

 

服务端无显示:

加入"\r\n"后:

 

 

 

错误集锦:

1.org.apache.jmeter.protocol.tcp.sampler.ReadException: Error reading from server, bytes read: 0

jmeterTCP协议读取异常

2.

eventLoop循环事件在没有监听的情况下停止,有个观点是eventLoop和channel其实是绑定关系,chanenl在设备下线时未将channel移除,也可以采用

ScheduledFuture<?> future=ch.cancel(false);关闭循环任务

少量客户端没有出现此类问题,却在jmeter得到了这么多问题,问题越多就越接近真实。。。

 

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐