Netty客户端断线重连实现及问题思考( 六 )


Netty客户端断线重连实现及问题思考

文章插图
 
io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop
//Return true if the given Thread is executed in the event loop, false otherwise.@Overridepublic boolean inEventLoop(Thread thread) {return thread == this.thread;}重连的方法是在一个NioEventLoop(也就是io线程)上被调用,第1次重连实际上是选择了第2个NioEventLoop,第2次重连实际上是选择了第3个NioEventLoop,以此类推,当一轮选择过后,重新选到第一个NioEventLoop时,boolean inEventLoop()返回true,则抛出了BlockingOperationException 。
方案1不要在netty的io线程上执行同步连接,使用单独的线程池定时执行重试,该线程还可以执行自己重连的业务逻辑操作,不阻塞io线程 。(如果不需要业务操作之后销毁线程池) 。
com.bruce.netty.rpc.client.SimpleClientHandler 修改reconnection方法
private static ScheduledExecutorService SCHEDULED_EXECUTOR;private void initScheduledExecutor() { if (SCHEDULED_EXECUTOR == null) {synchronized (SimpleClientHandler.class) {if (SCHEDULED_EXECUTOR == null) {SCHEDULED_EXECUTOR = Executors.newSingleThreadScheduledExecutor(r -> {Thread t = new Thread(r, "Client-Reconnect-1");t.setDaemon(true);return t;});}} }}private void reconnection(ChannelHandlerContext ctx) { log.info("5s之后重新建立连接"); initScheduledExecutor(); SCHEDULED_EXECUTOR.schedule(() -> {boolean connect = client.connect();if (connect) {//连接成功,关闭线程池SCHEDULED_EXECUTOR.shutdown();log.info("重新连接成功");} else {reconnection(ctx);} }, 3, TimeUnit.SECONDS);}方案2可以在io线程上使用异步重连:
com.bruce.netty.rpc.client.NettyClient添加方法connectAsync方法,两者的区别在于connectAsync方法中没有调用channelFuture的同步等待方法 。而是改成监听器(ChannelFutureListener)的方式,实际上这个监听器是运行在io线程上 。
public void connectAsync() {log.info("尝试连接到服务端: 127.0.0.1:8088");ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8088);channelFuture.addListener((ChannelFutureListener) future -> {Throwable cause = future.cause();if (cause != null) {exceptionHandler(cause);log.info("等待下一次重连");channelFuture.channel().eventLoop().schedule(this::connectAsync, 5, TimeUnit.SECONDS);} else {clientChannel = channelFuture.channel();if (clientChannel != null && clientChannel.isActive()) {log.info("Netty client started !!! {} connect to server", clientChannel.localAddress());}}});}com.bruce.netty.rpc.client.SimpleClientHandler
public class SimpleClientHandler extends ChannelInboundHandlerAdapter {private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);private NettyClient client;public SimpleClientHandler(NettyClient client) {this.client = client;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("client receive:{}", msg);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.warn("channelInactive:{}", ctx.channel().localAddress());ctx.pipeline().remove(this);ctx.channel().close();reconnectionAsync(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof IOException) {log.warn("exceptionCaught:客户端[{}]和远程断开连接", ctx.channel().localAddress());} else {log.error(cause);}ctx.pipeline().remove(this);ctx.close();reconnectionAsync(ctx);}private void reconnectionAsync(ChannelHandlerContext ctx) {log.info("5s之后重新建立连接");ctx.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {client.connectAsync();}}, 5, TimeUnit.SECONDS);}}netty客户端线程给多大比较合理 ?
netty中一个NioEventLoopGroup默认创建的线程数是cpu核心数 * 2 ,这些线程都是用于io操作,那么对于客户端应用程序来说真的需要这么多io线程么?
通过上面分析BlockingOperationException异常时我们分析到,实际上netty在创建一个Channel对象后只会从NioEventLoopGroup中选择一个NioEventLoop来绑定,只有创建多个Channel才会依次选择下一个NioEventLoop,也就是说一个Channel只会对应一个NioEventLoop,而NioEventLoop可以绑定多个Channel 。
1.对于客户端来说,如果只是连接的一个server节点,那么只要设置1条线程即可 。即使出现了断线重连,在连接断开之后,之前的Channel会从NioEventLoop移除 。重连之后,仍然只会在仅有的一个NioEventLoop注册一个新的Channel 。
2.如果客户端同时如下方式多次调用io.netty.bootstrap.Bootstrap#connect(String inetHost, int inetPort)连接多个Server节点,那么线程可以设置大一点,但不要超过2*c,而且只要出现断线重连,同样不能保证每个NioEventLoop都会绑定一个客户端Channel 。


推荐阅读