博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty实现长连接心跳检
阅读量:6479 次
发布时间:2019-06-23

本文共 4430 字,大约阅读时间需要 14 分钟。

主要逻辑

使用netty实现长连接,主要靠心跳来维持服务器端及客户端连接。

实现的逻辑主要是:

服务器端方面

 

1, 服务器在网络空闲操作一定时间后,服务端失败心跳计数器加1。

2, 如果收到客户端的ping心跳包,则清零失败心跳计数器,如果连续n次未收到客户端的ping心跳包,则关闭链路,释放资源,等待客户端重连。

客户端方面

 

1, 客户端网络空闲在一定时间内没有进行写操作时,则发送一个ping心跳包。

2, 如果服务器端未在发送下一个心跳包之前回复pong心跳应答包,则失败心跳计数器加1。

3, 如果客户端连续发送n(此处根据具体业务进行定义)次ping心跳包,服务器端均未回复pong心跳应答包,则客户端断开连接,间隔一定时间进行重连操作,直至连接服务器成功。

环境:netty5,tomcat7,jdk7,myeclipse

服务器端心跳处理类:

 

[java]
 
  1. public class HeartBeatRespHandler extends ChannelInboundHandlerAdapter {   
  2.     private  final Logger log=Logger.getLogger(HeartBeatRespHandler.class);  
  3.        //线程安全心跳失败计数器  
  4.        private AtomicInteger unRecPingTimes = new AtomicInteger(1);  
  5.        @Override  
  6.        public void channelRead(ChannelHandlerContext ctx, Object msg)    
  7.                 throws Exception {    
  8.            NettyMessageProto message = (NettyMessageProto)msg;  
  9.            unRecPingTimes = new AtomicInteger(1);  
  10.            //接收客户端心跳信息  
  11.            if(message.getHeader() != null  && message.getHeader().getType() == Constants.MSGTYPE_HEARTBEAT_REQUEST){  
  12.                 //清零心跳失败计数器  
  13.                 log.info("server receive client"+ctx.channel().attr(SysConst.SERIALNO_KEY)+" ping msg :---->"+message);  
  14.                 //接收客户端心跳后,进行心跳响应  
  15.                 NettyMessageProto replyMsg = buildHeartBeat();  
  16.                 ctx.writeAndFlush(replyMsg);  
  17.             }else{  
  18.                 ctx.fireChannelRead(msg);  
  19.             }  
  20.         }  
  21.          
  22.          
  23.         /** 
  24.          * 事件触发器,该处用来处理客户端空闲超时,发送心跳维持连接。 
  25.          */  
  26.         @Override  
  27.         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {    
  28.             if (evt instanceof IdleStateEvent) {    
  29.                 IdleStateEvent event = (IdleStateEvent) evt;    
  30.                 if (event.state() == IdleState.READER_IDLE) {    
  31.                     /*读超时*/    
  32.                     log.info("===服务器端===(READER_IDLE 读超时)");  
  33.                     unRecPingTimes.getAndIncrement();   
  34.                   //客户端未进行ping心跳发送的次数等于3,断开此连接  
  35.                     if(unRecPingTimes.intValue() == 3){    
  36.                           
  37.                           ctx.disconnect();  
  38.                           System.out.println("此客户端连接超时,服务器主动关闭此连接....");  
  39.                           log.info("此客户端连接超时,服务器主动关闭此连接....");  
  40.                     }   
  41.                 } else if (event.state() == IdleState.WRITER_IDLE) {    
  42.                     /*服务端写超时*/       
  43.                     log.info("===服务器端===(WRITER_IDLE 写超时)");  
  44.                       
  45.                 } else if (event.state() == IdleState.ALL_IDLE) {    
  46.                     /*总超时*/    
  47.                     log.info("===服务器端===(ALL_IDLE 总超时)");    
  48.                 }    
  49.             }    
  50.         }  
  51.           
  52.          
  53.        /** 
  54.         * 创建心跳响应消息 
  55.         * @return 
  56.         */  
  57.        private NettyMessageProto buildHeartBeat(){  
  58.            HeaderProto header = HeaderProto.newBuilder().setType(Constants.MSGTYPE_HEARTBEAT_RESPONSE).build();  
  59.            NettyMessageProto message =NettyMessageProto.newBuilder().setHeader(header).build();  
  60.            return message;  
  61.        }  
客户端心跳处理类:

 

 

[java]
 
  1. public class HeartBeatReqHandler extends ChannelHandlerAdapter {  
  2.     private  final Logger log=Logger.getLogger(HeartBeatReqHandler.class);  
  3.       
  4.     //线程安全心跳失败计数器  
  5.     private AtomicInteger unRecPongTimes = new AtomicInteger(1);  
  6.       
  7.     public void channelRead(ChannelHandlerContext ctx, Object msg)    
  8.             throws Exception {    
  9.         NettyMessageProto message = (NettyMessageProto)msg;    
  10.         //服务器端心跳回复  
  11.         if(message.getHeader() != null  && message.getHeader().getType() == Constants.MSGTYPE_HEARTBEAT_RESPONSE){  
  12.             //如果服务器进行pong心跳回复,则清零失败心跳计数器  
  13.             unRecPongTimes = new AtomicInteger(1);  
  14.             log.debug("client receive server pong msg :---->"+message);  
  15.         }else{  
  16.             ctx.fireChannelRead(msg);  
  17.         }  
  18.     }    
  19.       
  20.     /** 
  21.      * 事件触发器,该处用来处理客户端空闲超时,发送心跳维持连接。 
  22.      */  
  23.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {    
  24.         if (evt instanceof IdleStateEvent) {    
  25.             IdleStateEvent event = (IdleStateEvent) evt;    
  26.             if (event.state() == IdleState.READER_IDLE) {    
  27.                 /*读超时*/    
  28.                 log.info("===客户端===(READER_IDLE 读超时)");  
  29.             } else if (event.state() == IdleState.WRITER_IDLE) {    
  30.                 /*客户端写超时*/       
  31.                 log.info("===客户端===(WRITER_IDLE 写超时)");  
  32.                 unRecPongTimes.getAndIncrement();    
  33.                 //服务端未进行pong心跳响应的次数小于3,则进行发送心跳,否则则断开连接。  
  34.                 if(unRecPongTimes.intValue() < 3){    
  35.                     //发送心跳,维持连接  
  36.                     ctx.channel().writeAndFlush(buildHeartBeat()) ;   
  37.                     log.info("客户端:发送心跳");  
  38.                 }else{    
  39.                     ctx.channel().close();    
  40.                 }    
  41.             } else if (event.state() == IdleState.ALL_IDLE) {    
  42.                 /*总超时*/    
  43.                 log.info("===客户端===(ALL_IDLE 总超时)");    
  44.             }    
  45.         }    
  46.     }  
  47.           
  48.     private NettyMessageProto buildHeartBeat(){  
  49.         HeaderProto header = HeaderProto.newBuilder().setType(Constants.MSGTYPE_HEARTBEAT_REQUEST).build();  
  50.         NettyMessageProto  message = NettyMessageProto.newBuilder().setHeader(header).build();  
  51.         return message;  
  52.     }  
  53.       
  54.     /** 
  55.      * 异常处理 
  56.      */  
  57.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception{  
  58.         ctx.fireExceptionCaught(cause);  
  59.     }  
  60.   
  61. }  

 

[java]
 
  1. <pre code_snippet_id="2489110" snippet_file_name="blog_20170719_2_6056366" name="code" class="java"><pre code_snippet_id="2489110" snippet_file_name="blog_20170719_2_6056366"></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre><pre></pre></pre>  
  2. <pre></pre>  
  3. <pre></pre>  
  4. <pre></pre>  
  5. <pre></pre>  
  6. <link rel="stylesheet" href="http://static.blog.csdn.net/public/res-min/markdown_views.css?v=1.0">  
  7.                         
 
 
你可能感兴趣的文章
MySQL常见错误代码及代码说明
查看>>
Cglib动态代理基础使用
查看>>
设计模式 - 单例模式
查看>>
react native参考资料
查看>>
技术人员,为什么会苦逼
查看>>
使用126邮箱发送邮件的python脚本
查看>>
关于IP SLA及与EEM联动的探讨(转)
查看>>
DHCP在VLAN中的端口
查看>>
Maven
查看>>
课后习题和问题 Chapter 2 Problems 10-18
查看>>
缓存系统在游戏业务中的特异性
查看>>
Ngrok搭建自己的内网穿透
查看>>
在高性能的IO体系设计中,有几个名词概念常常会使我们感到迷惑不解。具体如下:...
查看>>
聊聊Java并发面试问题之公平锁与非公平锁是啥?
查看>>
linux cron计划任务
查看>>
针对nginx应用场景的配置 知识整理
查看>>
delegate、notification、KVO场景差别
查看>>
HDOJ 1166.敌兵布阵
查看>>
SQL Server性能优化(8)堆表结构介绍
查看>>
Opencv关于滑动条bar操作的实例
查看>>