乐趣区

关于java:netty系列之channel和channelGroup

简介

channel 是 netty 中数据传输和数据处理的渠道,也是 netty 程序中不可或缺的一环。在 netty 中 channel 是一个接口,针对不同的数据类型或者协定 channel 会有具体的不同实现。

尽管 channel 很重要,然而在代码中的确很神秘,基本上咱们很少可能看到间接应用 channel 的状况,那么事实真的如此吗?和 channel 相干的 ChannelGroup 又有什么作用呢?一起来看看吧。

神龙见首不见尾的 channel

其实 netty 的代码是有固定的模板的,首先依据是 server 端还是 client 端,而后创立对应的 Bootstrap 和 ServerBootstrap。而后给这个 Bootstrap 配置对应的 group 办法。而后为 Bootstrap 配置 channel 和 handler, 最初启动 Bootstrap 即可。

这样一个规范的 netty 程序就实现了。你须要做的就是为其筛选适合的 group、channel 和 handler。

咱们先看一个最简略的 NioServerSocketChannel 的状况:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChatServerInitializer());

            b.bind(PORT).sync().channel().closeFuture().sync();
        } finally {bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();}

这里,咱们将 NioServerSocketChannel 设置为 ServerBootstrap 的 channel。

这样就完了吗?channel 到底是在哪里用到的呢?

别急,咱们认真看一下 try block 中的最初一句:

b.bind(PORT).sync().channel().closeFuture().sync();

b.bind(PORT).sync() 实际上返回了一个 ChannelFuture 对象,通过调用它的 channel 办法,就返回了和它关联的 Channel 对象。

而后咱们调用了 channel.closeFuture() 办法。closeFuture 办法会返回一个 ChannelFuture 对象,这个对象将会在 channel 敞开的时候收到告诉。

而 sync 办法会实现同步阻塞,始终等到 channel 敞开为止,从而进行后续的 eventGroup 的 shutdown 操作。

在 ServerBootstrap 中构建模板中,channel 其实有两个作用,第一个作用是指定 ServerBootstrap 的 channel,第二个作用就是通过 channel 获取到 channel 敞开的事件,最终敞开整个 netty 程序。

尽管咱们基本上看不到 channel 的间接办法调用,然而 channel 毋庸置疑,它就是 netty 的灵魂。

接下来咱们再看一下具体音讯解决的 handler 的基本操作:

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // channel 沉闷
        ctx.write("Channel Active 状态!\r\n");
        ctx.flush();}

通常如果须要在 handler 中向 channel 写入数据,咱们调用的是 ChannelHandlerContext 的 write 办法。这个办法和 channel 有什么关系呢?

首先 write 办法是 ChannelOutboundInvoker 接口中的办法,而 ChannelHandlerContext 和 Channel 都继承了 ChannelOutboundInvoker 接口,也就是说,ChannelHandlerContext 和 Channel 都有 write 办法:

ChannelFuture write(Object msg);

因为这里咱们应用的是 NioServerSocketChannel,所以咱们来具体看一下 NioServerSocketChannel 中 write 的实现。

通过查看代码咱们会发现 NioServerSocketChannel 继承自 AbstractNioMessageChannel,AbstractNioMessageChannel 继承自 AbstractNioChannel,AbstractNioChannel 继承自 AbstractChannel, 而这个 write 办法就是 AbstractChannel 中实现的:

    public ChannelFuture write(Object msg) {return pipeline.write(msg);
    }

Channel 的 write 办法,实际上调用了 pipeline 的 write 办法。上面是 pipeLine 中的 write 办法:

    public final ChannelFuture write(Object msg) {return tail.write(msg);
    }

这里的 tail 是一个 AbstractChannelHandlerContext 对象。

这样咱们就得出了这样的论断:channel 中的 write 办法最终实际上调用的是 ChannelHandlerContext 中的 write 办法。

所以下面的:

ctx.write("Channel Active 状态!\r\n");

实际上能够间接从 channel 中调用:

Channel ch = b.bind(0).sync().channel();

// 将音讯写入 channel 中
ch.writeAndFlush("Channel Active 状态!\r\n").sync();

channel 和 channelGroup

channel 是 netty 的灵魂,对于 Bootstrap 来说,要获取到对应的 channel,能够通过调用:

b.bind(PORT).sync().channel()

来失去,从下面代码中咱们也能够看到一个 Bootstrap 只会对应一个 channel。

channel 中有一个 parent() 办法,用来返回它的父 channel,所以 channel 是有层级构造的,

咱们再来看一下 channelGroup 的定义:

public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> 

能够看到 ChannelGroup 实际上是 Channel 的汇合。ChannelGroup 用来将相似的 Channel 构建成汇合,从而能够对多个 channel 进行对立的治理。

能够能有小伙伴要问了,一个 Bootstrap 不是只对应一个 channel 吗?那么哪里来的 channel 的汇合?

事实上,在一些简单的程序中,咱们可能启动多个 Bootstrap 来解决不同的业务,所以相应的就会有多个 channel。

如果创立的 channel 过多,并且这些 channel 又是很同质化的时候,就有需要对这些 channel 进行对立治理。这时候就须要用到 channelGroup 了。

channelGroup 的根本应用

先看下 channelGroup 的根本应用,首先是创立一个 channelGroup:

ChannelGroup recipients =
           new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

有了 channelGroup 之后,能够调用 add 办法,向其中增加不同的 channel:

   recipients.add(channelA);
   recipients.add(channelB);

并且还能够对立向这些 channel 中发送音讯:

recipients.write(Unpooled.copiedBuffer(
           "这是从 channelGroup 中收回的对立音讯.",
           CharsetUtil.UTF_8));

基本上 channelGroup 提供了 write,flush,flushAndWrite,writeAndFlush,disconnect,close,newCloseFuture 等性能,用于对汇合中的 channel 进行对立治理。

如果你有多个 channel,那么能够思考应用 channelGroup。

另外 channelGroup 还有一些个性,咱们来具体理解一下。

将敞开的 channel 主动移出

ChannelGroup 是一个 channel 的汇合,当然咱们只心愿保留 open 状态的 channel,如果是 close 状态的 channel,还要手动从 ChannelGroup 中移出的话切实是太麻烦了。

所以在 ChannelGroup 中,如果一个 channel 被敞开了,那么它会主动从 ChannelGroup 中移出,这个性能是怎么实现的呢?

先来看下 channelGroup 的 add 办法:

   public boolean add(Channel channel) {
        ConcurrentMap<ChannelId, Channel> map =
            channel instanceof ServerChannel? serverChannels : nonServerChannels;

        boolean added = map.putIfAbsent(channel.id(), channel) == null;
        if (added) {channel.closeFuture().addListener(remover);
        }

        if (stayClosed && closed) {channel.close();
        }

        return added;
    }

能够看到,在 add 办法中,为 channel 辨别了是 server channel 还是非 server channel。而后依据 channel id 将其存入 ConcurrentMap 中。

如果增加胜利,则给 channel 增加了一个 closeFuture 的回调。当 channel 被敞开的时候,会去调用这个 remover 办法:

private final ChannelFutureListener remover = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {remove(future.channel());
        }
    };

remover 办法会将 channel 从 serverChannels 或者 nonServerChannels 中移出。从而保障 ChannelGroup 中只保留 open 状态的 channel。

同时敞开 serverChannel 和 acceptedChannel

尽管 ServerBootstrap 的 bind 办法只会返回一个 channel,然而对于 server 来说,能够有多个 worker EventLoopGroup, 所以当客户端和服务器端建设连贯之后建设的 accepted Channel 是 server channel 的子 channel。

也就是说一个服务器端有一个 server channel 和多个 accepted channel。

那么如果咱们想要同时敞开这些 channel 的话,就能够应用 ChannelGroup 的 close 办法。

因为如果 Server channel 和非 Server channel 在同一个 ChannelGroup 的话,所有的 IO 命令都会先发给 server channel,而后才会发给非 server channel。

所以咱们能够将 Server channel 和非 Server channel 通通退出同一个 ChannelGroup 中,在程序的最初,对立调用 ChannelGroup 的 close 办法,从而达到该目标:

   ChannelGroup allChannels =
           new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  
   public static void main(String[] args) throws Exception {ServerBootstrap b = new ServerBootstrap(..);
       ...
       b.childHandler(new MyHandler());
  
       // 启动服务器
       b.getPipeline().addLast("handler", new MyHandler());
       Channel serverChannel = b.bind(..).sync();
       allChannels.add(serverChannel);
  
       ... 期待 shutdown 指令 ...
  
       // 敞开 serverChannel 和所有的 accepted connections.
       allChannels.close().awaitUninterruptibly();
   }
  
   public class MyHandler extends ChannelInboundHandlerAdapter {
        @Override
       public void channelActive(ChannelHandlerContext ctx) {
           // 将 accepted channel 增加到 allChannels 中
           allChannels.add(ctx.channel());
           super.channelActive(ctx);
       }
   }

ChannelGroupFuture

另外,和 channel 一样,channelGroup 的操作都是异步的,它会返回一个 ChannelGroupFuture 对象。

咱们看下 ChannelGroupFuture 的定义:

public interface ChannelGroupFuture extends Future<Void>, Iterable<ChannelFuture>

能够看到 ChannelGroupFuture 是一个 Future,同时它也是一个 ChannelFuture 的遍历器,能够遍历 ChannelGroup 中所有 channel 返回的 ChannelFuture。

同时 ChannelGroupFuture 提供了 isSuccess,isPartialSuccess,isPartialFailure 等办法判断命令是否局部胜利。

ChannelGroupFuture 还提供了 addListener 办法用来监听具体的事件。

总结

channel 是 netty 的外围,当咱们有多个 channel 不便进行治理的时候,就能够应用 channelGroup 进行对立治理。

本文已收录于 http://www.flydean.com/04-1-netty-channel-group/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」, 懂技术,更懂你!

退出移动版