关于java:Netty-中的心跳机制还有谁不会

作者:rickiyang

出处:www.cnblogs.com/rickiyang/p/11074231.html

咱们晓得在TCP长连贯或者WebSocket长连贯中个别咱们都会应用心跳机制–即发送非凡的数据包来通告对方本人的业务还没有办完,不要敞开链接。

那么心跳机制能够用来做什么呢?

咱们晓得网络的传输是不牢靠的,当咱们发动一个链接申请的过程之中会产生什么事件谁都无奈意料,或者断电,服务器重启,断网线之类。

如果有这种状况的产生对方也无奈判断你是否还在线。所以这时候咱们引入心跳机制,在长链接中单方没有数据交互的时候相互发送数据(可能是空包,也可能是非凡数据),对方收到该数据之后也回复相应的数据用以确保单方都在线,这样就能够确保以后链接是无效的。

1. 如何实现心跳机制

个别实现心跳机制由两种形式:

  • TCP协定自带的心跳机制来实现;
  • 在应用层来实现。

然而TCP协定自带的心跳机制零碎默认是设置的是2小时的心跳频率。它查看不到机器断电、网线插入、防火墙这些断线。而且逻辑层解决断线可能也不是那么好解决。另外该心跳机制是与TCP协定绑定的,那如果咱们要是应用UDP协定岂不是用不了?所以个别咱们都不必。

而个别咱们本人实现呢大抵的策略是这样的:

  1. Client启动一个定时器,一直发送心跳;
  2. Server收到心跳后,做出回应;
  3. Server启动一个定时器,判断Client是否存在,这里做判断有两种办法:时间差和简略标识。

时间差:

  1. 收到一个心跳包之后记录以后工夫;
  2. 判断定时器达到工夫,计算多久没收到心跳工夫=以后工夫-上次收到心跳工夫。如果改工夫大于设定值则认为超时。

简略标识:

  1. 收到心跳后设置连贯标识为true;
  2. 判断定时器达到工夫,如果未收到心跳则设置连贯标识为false;

明天咱们来看一下Netty的心跳机制的实现,在Netty中提供了IdleStateHandler类来进行心跳的解决,它能够对一个 Channel 的 读/写设置定时器, 当 Channel 在肯定事件间隔内没有数据交互时(即处于 idle 状态), 就会触发指定的事件。

该类能够对三种类型的超时做心跳机制检测:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
  • readerIdleTimeSeconds:设置读超时工夫;
  • writerIdleTimeSeconds:设置写超时工夫;
  • allIdleTimeSeconds:同时为读或写设置超时工夫;

上面咱们还是通过一个例子来解说IdleStateHandler的应用。

服务端:

public class HeartBeatServer {
    private int port;

    public HeartBeatServer(int port) {
        this.port = port;
    }

    public void start(){
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new HeartBeatServerChannelInitializer());

        try {
            ChannelFuture future = server.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        HeartBeatServer server = new HeartBeatServer(7788);
        server.start();
    }
}

服务端Initializer:

public class HeartBeatServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        pipeline.addLast("handler",new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast(new HeartBeatServerHandler());
    }
}

在这里IdleStateHandler也是handler的一种,所以退出addLast。咱们别离设置4个参数:读超时工夫为3s,写超时和读写超时为0,而后退出工夫管制单元。

服务端handler:

public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
    private int loss_connect_time = 0;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + "Server :" + msg.toString());
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){
            //服务端对应着读事件,当为READER_IDLE时触发
                IdleStateEvent event = (IdleStateEvent)evt;
            if(event.state() == IdleState.READER_IDLE){
                loss_connect_time++;
                System.out.println("接管音讯超时");
                if(loss_connect_time > 2){
                    System.out.println("敞开不流动的链接");
                    ctx.channel().close();
                }
            }else{
                super.userEventTriggered(ctx,evt);
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

咱们看到在handler中调用了userEventTriggered办法,IdleStateEvent的state()办法一个有三个值:
READER_IDLE,WRITER_IDLE,ALL_IDLE。正好对应读事件写事件和读写事件。

再来写一下客户端:

public class HeartBeatsClient {
    private  int port;
    private  String address;

    public HeartBeatsClient(int port, String address) {
        this.port = port;
        this.address = address;
    }

    public void start(){
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new HeartBeatsClientChannelInitializer());

        try {
            ChannelFuture future = bootstrap.connect(address,port).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) {
        HeartBeatsClient client = new HeartBeatsClient(7788,"127.0.0.1");
        client.start();
    }
}

客户端Initializer:

public class HeartBeatsClientChannelInitializer extends  ChannelInitializer<SocketChannel> {

    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        pipeline.addLast("handler", new IdleStateHandler(0, 3, 0, TimeUnit.SECONDS));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast(new HeartBeatClientHandler());
    }
}

这里咱们设置了IdleStateHandler的写超时为3秒,客户端执行的动作为写音讯到服务端,服务端执行读动作。

客户端handler:

public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {

    private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
            CharsetUtil.UTF_8));

    private static final int TRY_TIMES = 3;

    private int currentTime = 0;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("激活工夫是:"+new Date());
        System.out.println("链接曾经激活");
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("进行工夫是:"+new Date());
        System.out.println("敞开链接");
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("以后轮询工夫:"+new Date());
        if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.WRITER_IDLE) {
                if(currentTime <= TRY_TIMES){
                    System.out.println("currentTime:"+currentTime);
                    currentTime++;
                    ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
                }
            }
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String message = (String) msg;
        System.out.println(message);
        if (message.equals("Heartbeat")) {
            ctx.write("has read message from server");
            ctx.flush();
        }
        ReferenceCountUtil.release(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

启动服务端和客户端咱们看到输入为:

咱们再来屡一下思路:

  1. 首先客户端激活channel,因为客户端中并没有发送音讯所以会触发客户端的IdleStateHandler,它设置的写超时工夫为3s;
  2. 而后触发客户端的事件机制进入userEventTriggered办法,在触发器中计数并向客户端发送音讯;
  3. 服务端接管音讯;
  4. 客户端触发器持续轮询发送音讯,直到计数器满不再向服务端发送音讯;
  5. 服务端在IdleStateHandler设置的读音讯超时工夫5s内未收到音讯,触发了服务端中handler的userEventTriggered办法,于是敞开客户端的链接。

大体咱们的简略心跳机制就是这样的思路,通过事件触发机制以及计数器的形式来实现,下面咱们的案例中最初客户端没有发送音讯的时候咱们是强制断开了客户端的链接,那么既然能够敞开,咱们是不是也可是从新链接客户端呢?因为万一客户端自身并不想敞开而是因为别的起因导致他无奈与服务端通信。上面咱们来说一下重连机制。

当咱们的服务端在未读到客户端音讯超时而敞开客户端的时候咱们个别在客户端的finally块中方的是敞开客户端的代码,这时咱们能够做一下批改的,finally是肯定会被执行新的,所以咱们能够在finally块中从新调用一下启动客户端的代码,这样就又重新启动了客户端了,上客户端代码:

/**
 * 本Client为测试netty重连机制
 * Server端代码都一样,所以不做批改
 * 只用在client端中做一下判断即可
 */
public class HeartBeatsClient2 {

    private  int port;
    private  String address;
    ChannelFuture future;

    public HeartBeatsClient2(int port, String address) {
        this.port = port;
        this.address = address;
    }

    public void start(){
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new HeartBeatsClientChannelInitializer());

        try {
            future = bootstrap.connect(address,port).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //group.shutdownGracefully();
            if (null != future) {
                if (future.channel() != null && future.channel().isOpen()) {
                    future.channel().close();
                }
            }
            System.out.println("筹备重连");
            start();
            System.out.println("重连胜利");
        }

    }

    public static void main(String[] args) {
        HeartBeatsClient2 client = new HeartBeatsClient2(7788,"127.0.0.1");
        client.start();
    }
}

其余部分的代码与下面的实例并无异同,只需革新客户端即可,咱们再运行服务端和客户端会看到客户端尽管被敞开了,然而立马又被重启:

当然生产级别的代码应该不是这样实现的吧,哈哈。

近期热文举荐:

1.1,000+ 道 Java面试题及答案整顿(2021最新版)

2.终于靠开源我的项目弄到 IntelliJ IDEA 激活码了,真香!

3.阿里 Mock 工具正式开源,干掉市面上所有 Mock 工具!

4.Spring Cloud 2020.0.0 正式公布,全新颠覆性版本!

5.《Java开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞+转发哦!

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理