为啥netty超时连接netty 关闭客户端连接不掉

为啥netty超时连接关闭不掉呢?
pipeline.addLast(&timeoutHandler&, new TimeoutHandler(timer, timeout));
public class TimeoutHandler extends IdleStateHandler{ &&& private Logger logger = LoggerFactory.getLogger(this.getClass());
&&& public TimeoutHandler(Timer timer, int seconds) { &&&&&&& super(timer, seconds, seconds, 0); &&& }
&&& @Override &&& protected void channelIdle(ChannelHandlerContext ctx, IdleState state, long lastActivityTimeMillis) throws Exception { &&&&&&& super.channelIdle(ctx, state, lastActivityTimeMillis);
&&&&&&& logger.error(&channel id: & + ctx.getChannel().getId()); &&&&&&& ctx.getChannel().close();
&&&&&&& logger.error(&state: & + ctx.getChannel().isOpen()); &&& } }
&连接空闲超时,大部分在channelIdle里都能关掉,但现在发现,有个客户端ip地址,空闲的连接在channelIdle里一直关不掉,虽然close()了,但下面打印出来的state依然是true。并且隔段时间,同样的channel id又在日志出现。各位大侠,哪位知道怎么回事?
ctx.getChannel().close().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
logger.error(&state:& + future.getChannel().isOpen());
}); Netty 的操作几乎所有都是异步的,信道close是一个异步操作,很频繁的时候,马上调用 isOpen,有很大几率是还是打开状态。netty长连接服务器断开后,客户端如何重新连接 - ITeye问答
private static Bootstrap b = new Bootstrap();
public static void start() throws Exception {
&&& EventLoopGroup workerGroup = new NioEventLoopGroup();
// b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.option(ChannelOption.ALLOW_HALF_CLOSURE,true);
&&&&&&& b.handler(new ChannelInitializer&SocketChannel&() {
&&&&&&&&&&& @Override
&&&&&&&&&&& protected void initChannel(SocketChannel ch) throws Exception {
&&&&&&&&&&&&&&& ChannelPipeline pipeline = ch.pipeline();
&&&&&&&&&&&&&&& // 以("\n")为结尾分割的 解码器
&&&&&&&&&&&&&&& pipeline.addLast("framer", new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()));
&&&&&&&&&&&&&&& // 字符串解码 和 编码
&&&&&&&&&&&&&&& pipeline.addLast("decoder", new StringDecoder());
&&&&&&&&&&&&&&& pipeline.addLast("encoder", new StringEncoder());
&&&&&&&&&&&&&&& // 自己的逻辑Handler
&&&&&&&&&&&&&&& pipeline.addLast("handler", new SimpleChannelInboundHandler() {
&&&&&&&&&&&&&&&&&&& @Override
&&&&&&&&&&&&&&&&&&& protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
&&&&&&&&&&&&&&&&&&&&&&& logger.debug("接收到的状态内容:" + o.toString());
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&&&&&& @Override
&&&&&&&&&&&&&&&&&&& public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
&&&&&&&&&&&&&&&&&&&&&&& logger.debug("连接成功");
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&&&&&& @Override
&&&&&&&&&&&&&&&&&&& public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&&&&&& @Override
&&&&&&&&&&&&&&&&&&& public void channelActive(ChannelHandlerContext ctx) throws Exception {
&&&&&&&&&&&&&&&&&&&&&&& logger.debug("======channelActive========");
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&&&&&& @Override
&&&&&&&&&&&&&&&&&&& public void channelInactive(ChannelHandlerContext ctx) throws Exception {
&&&&&&&&&&&&&&&&&&&&&&& logger.debug("======channelInactive========");
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&&&&&& @Override
&&&&&&&&&&&&&&&&&&& public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
&&&&&&&&&&&&&&&&&&&&&&& logger.debug("======exceptionCaught========"+cause.getMessage());&&&&&&&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&& }
&&&&&&&&&&&&&&& });
&&&&&&&&&&& }
&&&&&&& });
&&&&&&& // Start the client.
&&&&&&& ChannelFuture f = b.connect("127.0.0.1",8060).sync(); // (5)
&&&&&&& // Wait until the connection is closed.
&&&&&&& f.channel().closeFuture().sync();
&&& }finally{
&&&&&&&&&&& workerGroup.shutdownGracefully();
ConnectionListener这个代码可否贴出,tkx
前2天才实现了这个,后来整个设计又把长连接改成短连接..
就是短线重连,服务器重启的时候,等服务器启动好了,能让客户端重连上,对吧。
由于采用的是长连接,因此当connection断掉后,你client端的channelClosed是能感知到的。在这里写一个scheduleExecute,然后不断连接,直到成功。
长连接,就是不允许断,那么只要close你就去连就可以了
public class MyInboundHandler extends SimpleChannelInboundHandler {
public MyInboundHandler(Client client) {
this.client =
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(new Runnable() {
public void run() {
client.createBootstrap(new Bootstrap(), eventLoop);
}, 1L, TimeUnit.SECONDS);
super.channelInactive(ctx);
public class Client
private EventLoopGroup loop = new NioEventLoopGroup();
public static void main( String[] args )
new Client().run();
public Bootstrap createBootstrap(Bootstrap bootstrap, EventLoopGroup eventLoop) {
if (bootstrap != null) {
final MyInboundHandler handler = new MyInboundHandler(this);
bootstrap.group(eventLoop);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer&SocketChannel&() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(handler);
bootstrap.remoteAddress("localhost", 8888);
bootstrap.connect().addListener(new ConnectionListener(this));
public void run() {
createBootstrap(new Bootstrap(), loop);
这样不可以吗:服务端正常关闭时,会触发客户端的channelInactive事件,在这个事件里面重新连接服务器
已解决问题
未解决问题通信框架netty5.0课程二:netty超时心跳机制 - 互联网当前位置:& &&&通信框架netty5.0课程二:netty超时心跳机制通信框架netty5.0课程二:netty超时心跳机制&&网友分享于:&&浏览:0次通信框架netty5.0教程二:netty超时心跳机制
上一章已经讲了如何搭建一个简单的netty server,这一章讲一下netty超时心跳机制。一般应用场景是client在一定时间未收到server端数据时给server端发送心跳请求,server收到心跳请求后发送一个心跳包给client端,以此维持通信。发送心跳由client执行,server端反馈心跳就可以了,好了不多说了,上代码:netty server代码:
import java.util.concurrent.TimeU
import org.apache.log4j.L
import io.netty.bootstrap.ServerB
import io.netty.channel.ChannelF
import io.netty.channel.ChannelI
import io.netty.channel.ChannelO
import io.netty.channel.ChannelP
import io.netty.channel.EventLoopG
import io.netty.channel.nio.NioEventLoopG
import io.netty.channel.socket.SocketC
import io.netty.channel.socket.nio.NioServerSocketC
import io.netty.handler.codec.LengthFieldBasedFrameD
import io.netty.handler.timeout.IdleStateH
public class NettyServerBootstrap {
private static Logger logger = Logger.getLogger(NettyServerBootstrap.class);
public NettyServerBootstrap(int port) {
this.port =
private void bind() {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childHandler(new ChannelInitializer&SocketChannel&() {
protected void initChannel(SocketChannel socketChannel)
throws Exception {
ChannelPipeline p = socketChannel.pipeline();
// 超时处理:参数分别为读超时时间、写超时时间、读和写都超时时间、时间单位
p.addLast(new IdleStateHandler(15, 30, 30, TimeUnit.SECONDS));
p.addLast(new NettyIdleStateHandler());
p.addLast(new NettyServerHandler());
ChannelFuture f = bootstrap.bind(port).sync();
if (f.isSuccess()) {
logger.debug("启动Netty服务成功,端口号:" + this.port);
// 关闭连接
f.channel().closeFuture().sync();
} catch (Exception e) {
logger.error("启动Netty服务异常,异常信息:" + e.getMessage());
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
server端NettyIdleStateHandler:
import org.apache.log4j.L
import io.netty.channel.ChannelHandlerA
import io.netty.channel.ChannelHandlerC
import io.netty.handler.timeout.IdleS
import io.netty.handler.timeout.IdleStateE
* 处理超时连接handler
* @author Guo Kaixuan
public class NettyIdleStateHandler extends ChannelHandlerAdapter {
private static Logger logger = Logger
.getLogger(NettyIdleStateHandler.class);
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent)
if (event.state().equals(IdleState.READER_IDLE)) {
logger.warn("Netty服务器在通道" + ctx.channel().id() + "上读超时,已将该通道关闭");
} else if (event.state().equals(IdleState.WRITER_IDLE)) {
logger.warn("Netty服务器在通道" + ctx.channel().id() + "上写超时,已将该通道关闭");
} else if (event.state().equals(IdleState.ALL_IDLE)) {
logger.warn("Netty服务器在通道" + ctx.channel().id()
+ "上读&写超时,已将该通道关闭");
super.userEventTriggered(ctx, evt);
server端NettyServerHandler
import java.io.UnsupportedEncodingE
import org.apache.log4j.L
import io.netty.buffer.ByteB
import io.netty.channel.ChannelHandlerA
import io.netty.channel.ChannelHandlerC
import io.netty.channel.socket.SocketC
public class NettyServerHandler extends ChannelHandlerAdapter {
private static Logger logger = Logger.getLogger(NettyServerHandler.class);
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf)
String recieved = getMessage(buf);
System.out.println("服务器接收到消息:" + recieved);
ctx.writeAndFlush(newPing("服务器已收到心跳包"));
} catch (Exception e) {
ServiceLoggerUtils.error("通信异常");
* 从ByteBuf中获取信息 使用UTF-8编码返回
private String getMessage(ByteBuf buf) {
byte[] con = new byte[buf.readableBytes()];
buf.readBytes(con);
return new String(con, Constant.UTF8);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
public static ByteBuf newPing(String message) {
byte[] mes = message.getBytes();
ByteBuf pingMessage = Unpooled.buffer();
pingMessage.writeBytes(mes);
return pingM
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
client端代码:
import java.util.concurrent.TimeU
import io.netty.bootstrap.B
import io.netty.channel.ChannelF
import io.netty.channel.ChannelI
import io.netty.channel.ChannelO
import io.netty.channel.EventLoopG
import io.netty.channel.nio.NioEventLoopG
import io.netty.channel.socket.SocketC
import io.netty.channel.socket.nio.NioSocketC
import io.netty.handler.codec.LengthFieldBasedFrameD
import io.netty.handler.timeout.IdleStateH
public class NettyClientBootstrap {
* 服务器端口号
* 服务器IP
@SuppressWarnings("unused")
private SocketChannel socketC
public NettyClientBootstrap(int port, String host)
throws InterruptedException {
this.port =
this.host =
private void start() throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.group(eventLoopGroup);
bootstrap.remoteAddress(host, port);
bootstrap.handler(new ChannelInitializer&SocketChannel&() {
protected void initChannel(SocketChannel socketChannel)
throws Exception {
//超时处理:参数分别为读超时时间、写超时时间、读和写都超时时间、时间单位
socketChannel.pipeline().addLast(new IdleStateHandler(3, 8, 0, TimeUnit.SECONDS));
socketChannel.pipeline().addLast(new NettyIdleStateHandler());
socketChannel.pipeline().addLast(new NettyClientHandler());
ChannelFuture future = bootstrap.connect(host, port).sync();
if (future.isSuccess()) {
socketChannel = (SocketChannel) future.channel();
System.out.println("----------------connect server success----------------");
future.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
public static void main(String[] args) throws InterruptedException {
NettyClientBootstrap bootstrap = new NettyClientBootstrap(9999,
"localhost");
client端NettyIdleStateHandler:
import io.netty.buffer.ByteB
import io.netty.buffer.U
import io.netty.channel.ChannelHandlerA
import io.netty.channel.ChannelHandlerC
import io.netty.handler.timeout.IdleS
import io.netty.handler.timeout.IdleStateE
* 处理超时连接handler
public class NettyIdleStateHandler extends ChannelHandlerAdapter {
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent)
if (event.state().equals(IdleState.READER_IDLE)) {
String message = "心跳包";
byte[] req = message.getBytes();
ByteBuf pingMessage = Unpooled.buffer();
pingMessage.writeBytes(req);
ctx.writeAndFlush(pingMessage);
System.out.println("客户端读超时,已发送心跳");
} else if (event.state().equals(IdleState.WRITER_IDLE)) {
System.out.println("客户端写超时");
} else if (event.state().equals(IdleState.ALL_IDLE)) {
System.out.println("客户端读&写超时");
super.userEventTriggered(ctx, evt);
client端NettyClientHandler:
import io.netty.buffer.ByteB
import io.netty.buffer.U
import io.netty.channel.ChannelHandlerA
import io.netty.channel.ChannelHandlerC
public class NettyClientHandler extends ChannelHandlerAdapter
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
String result = getMessage((ByteBuf) msg);
System.out.print("客户端收到服务器响应数据:" + result);
private String getMessage(ByteBuf buf) {
byte[] con = new byte[buf.readableBytes()];
buf.readBytes(con);
return new String(con, Constant.UTF8);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
以上代码需要引入netty5包和log4j包,netty包上一张有将如何加入,log4j请自行下载或者maven添加。如有问题欢迎留言。
12345678910
12345678910
12345678910 上一篇:没有了下一篇:文章评论相关解决方案 1234567891011 Copyright & &&版权所有3147人阅读
初学Netty框架,入门时总会犯些低级错误,我把这些都记录下来,可以加深理解。
今天遇到的问题是ChannelFuture连接超时后会报异常从而导致客户端崩溃。
我是这么写的
ChannelFuture future = bootstrap.connect(host, port).sync();
sync()是同步调用,具体机制我还没大搞明白。
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,1000);
添加了一个超时设置,然后将sync()改为awaitUninterruptibly()
之后就可以判断future的状态从而执行相应的逻辑,而不会报异常。
socketChannel.pipeline().addLast(&idleStateHandler&,new IdleStateHandler(2000,2000,2000,TimeUnit.MILLISECONDS));
这是channel的读写超时,与ChannelFuture超时无关
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception{
System.out.println(&触发事件&);
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
ctx.close();
System.out.println(&READER_IDLE 读超时&);
} else if (e.state() == IdleState.WRITER_IDLE) {
ByteBuf buff = ctx.alloc().buffer();
buff.writeBytes(&mayi test&.getBytes());
ctx.writeAndFlush(buff);
System.out.println(&WRITER_IDLE 写超时&);
System.out.println(&其他超时&);
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:4916次
排名:千里之外
(1)(1)(1)(3)(5)(3)(1)Netty长连接的事件处理顺序问题 -
- ITeye技术网站
转载自:http://www.blogjava.net/hankchenNetty长连接的事件处理顺序问题博客分类: 网络开发+Mina+Netty最近的一个线上项目(认证服务器)老是出现服务延迟的情况。具体的问题描述:(1)客户端发送一个请求A(长连接),在服务器端的业务层需要20秒以上才能接收到。(2)客户端发送一个请求B(端连接),在服务器端的业务层可以迅速接收到。从现象大致知道问题出在服务器端的网络接收层,大量通过长连接发送过来的请求都堵塞在网络层得不到处理(在网络层排队,还没到应用层)。(友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen)后来经过排查,发现是Netty中的OrderedMemoryAwareThreadPoolExecutor原因。相关代码如下:MemoryAwareThreadPoolExecutor executor = new OrderedMemoryAwareThreadPoolExecutor(threadNums, maxChannelMemorySize,
maxTotalMemorySize, keepAliveTime,
TimeUnit.SECONDS);ExecutionHandler executionHandler = new ExecutionHandler(executor);public ChannelPipeline getPipeline() throws Exception{
ChannelPipeline pipeline = pipeline();
pipeline.addLast("decoder", new AuthDecoder());
pipeline.addLast("encoder", new AuthEncoder());
pipeline.addLast("executor", executionHandler);
pipeline.addLast("handler", new AuthServerHandler(commandFactory));
}先介绍下背景知识,再来分析问题。大家都知道,Netty是一个基于事件的NIO框架。在Netty中,一切网络动作都是通过事件来传播并处理的,例如:Channel读、Channel写等等。回忆下Netty的流处理模型:Boss线程(一个服务器端口对于一个)---接收到客户端连接---生成Channel---交给Work线程池(多个Work线程)来处理。具体的Work线程---读完已接收的数据到ChannelBuffer---触发ChannelPipeline中的ChannelHandler链来处理业务逻辑。注意:执行ChannelHandler链的整个过程是同步的,如果业务逻辑的耗时较长,会将导致Work线程长时间被占用得不到释放,从而影响了整个服务器的并发处理能力。所以,为了提高并发数,一般通过ExecutionHandler线程池来异步处理ChannelHandler链(worker线程在经过ExecutionHandler后就结束了,它会被ChannelFactory的worker线程池所回收)。在Netty中,只需要增加一行代码:public ChannelPipeline getPipeline() {
return Channels.pipeline(
new DatabaseGatewayProtocolEncoder(),
new DatabaseGatewayProtocolDecoder(),
executionHandler, // Must be shared
new DatabaseQueryingHandler());}例如:ExecutionHandler executionHandler = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(16, 48576))对于ExecutionHandler需要的线程池模型,Netty提供了两种可选:1) MemoryAwareThreadPoolExecutor 通过对线程池内存的使用控制,可控制Executor中待处理任务的上限(超过上限时,后续进来的任务将被阻塞),并可控制单个Channel待处理任务的上限,防止内存溢出错误;2) OrderedMemoryAwareThreadPoolExecutor 是 MemoryAwareThreadPoolExecutor 的子类。除了MemoryAwareThreadPoolExecutor 的功能之外,它还可以保证同一Channel中处理的事件流的顺序性,这主要是控制事件在异步处理模式下可能出现的错误的事件顺序,但它并不保证同一Channel中的事件都在一个线程中执行(通常也没必要)。例如:Thread X: --- Channel A (Event A1) --.
.-- Channel B (Event B2) --- Channel B (Event B3) ---&
/ \Thread Y: --- Channel B (Event B1) --'
'-- Channel A (Event A2) --- Channel A (Event A3) ---&上图表达的意思有几个:(1)对整个线程池而言,处理同一个Channel的事件,必须是按照顺序来处理的。例如,必须先处理完Channel A (Event A1) ,再处理Channel A (Event A2)、Channel A (Event A3)(2)同一个Channel的多个事件,会分布到线程池的多个线程中去处理。(3)不同Channel的事件可以同时处理(分担到多个线程),互不影响。
OrderedMemoryAwareThreadPoolExecutor 的这种事件处理有序性是有意义的,因为通常情况下,请求发送端希望服务器能够按照顺序处理自己的请求,特别是需要多次握手的应用层协议。例如:XMPP协议。现在回到具体业务上来,我们这里的认证服务也使用了OrderedMemoryAwareThreadPoolExecutor。认证服务的其中一个环节是使用长连接,不断处理来自另外一个服务器的认证请求。通信的数据包都很小,一般都是200个字节以内。一般情况下,处理这个过程很快,所以没有什么问题。但是,由于认证服务需要调用第三方的接口,如果第三方接口出现延迟,将导致这个过程变慢。一旦一个事件处理不完,由于要保持事件处理的有序性,其他事件就全部堵塞了!而短连接之所以没有问题,是因为短连接一个Channel就一个请求数据包,处理完Channel就关闭了,根本不存在顺序的问题,所以在业务层可以迅速收到请求,只是由于同样的原因(第三方接口),处理时间会比较长。其实,认证过程都是独立的请求数据包(单个帐号),每个请求数据包之间是没有任何关系的,保持这样的顺序没有意义!最后的改进措施:1、去掉OrderedMemoryAwareThreadPoolExecutor,改用MemoryAwareThreadPoolExecutor。2、减少调用第三方接口的超时时间,让处理线程尽早回归线程池。
yanzhongguan
浏览: 16792 次
来自: 杭州
用了Reactor Composable的代码的第三行S.ea ...}

我要回帖

更多关于 netty 客户端连接超时 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信