简介

channel是netty中数据传输和数据处理的渠道,也是netty程序中不可或缺的一环。在netty中channel是一个接口,针对不同的数据类型或者协定channel会有具体的不同实现。

尽管channel很重要,然而在代码中的确很神秘,基本上咱们很少可能看到间接应用channel的状况,那么事实真的如此吗?和channel相干的ChannelGroup又有什么作用呢?一起来看看吧。

神龙见首不见尾的channel

其实netty的代码是有固定的模板的,首先依据是server端还是client端,而后创立对应的Bootstrap和ServerBootstrap。而后给这个Bootstrap配置对应的group办法。而后为Bootstrap配置channel和handler,最初启动Bootstrap即可。

这样一个规范的netty程序就实现了。你须要做的就是为其筛选适合的group、channel和handler。

咱们先看一个最简略的NioServerSocketChannel的状况:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)             .channel(NioServerSocketChannel.class)             .handler(new LoggingHandler(LogLevel.INFO))             .childHandler(new ChatServerInitializer());            b.bind(PORT).sync().channel().closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }

这里,咱们将NioServerSocketChannel设置为ServerBootstrap的channel。

这样就完了吗?channel到底是在哪里用到的呢?

别急,咱们认真看一下try block中的最初一句:

b.bind(PORT).sync().channel().closeFuture().sync();

b.bind(PORT).sync()实际上返回了一个ChannelFuture对象,通过调用它的channel办法,就返回了和它关联的Channel对象。

而后咱们调用了channel.closeFuture()办法。closeFuture办法会返回一个ChannelFuture对象,这个对象将会在channel敞开的时候收到告诉。

而sync办法会实现同步阻塞,始终等到channel敞开为止,从而进行后续的eventGroup的shutdown操作。

在ServerBootstrap中构建模板中,channel其实有两个作用,第一个作用是指定ServerBootstrap的channel,第二个作用就是通过channel获取到channel敞开的事件,最终敞开整个netty程序。

尽管咱们基本上看不到channel的间接办法调用,然而channel毋庸置疑,它就是netty的灵魂。

接下来咱们再看一下具体音讯解决的handler的基本操作:

    public void channelActive(ChannelHandlerContext ctx) throws Exception {        // channel沉闷        ctx.write("Channel Active状态!\r\n");        ctx.flush();    }

通常如果须要在handler中向channel写入数据,咱们调用的是ChannelHandlerContext的write办法。这个办法和channel有什么关系呢?

首先write办法是ChannelOutboundInvoker接口中的办法,而ChannelHandlerContext和Channel都继承了ChannelOutboundInvoker接口,也就是说,ChannelHandlerContext和Channel都有write办法:

ChannelFuture write(Object msg);

因为这里咱们应用的是NioServerSocketChannel,所以咱们来具体看一下NioServerSocketChannel中write的实现。

通过查看代码咱们会发现NioServerSocketChannel继承自AbstractNioMessageChannel,AbstractNioMessageChannel继承自AbstractNioChannel,AbstractNioChannel继承自AbstractChannel,而这个write办法就是AbstractChannel中实现的:

    public ChannelFuture write(Object msg) {        return pipeline.write(msg);    }

Channel的write办法,实际上调用了pipeline的write办法。上面是pipeLine中的write办法:

    public final ChannelFuture write(Object msg) {        return tail.write(msg);    }

这里的tail是一个AbstractChannelHandlerContext对象。

这样咱们就得出了这样的论断:channel中的write办法最终实际上调用的是ChannelHandlerContext中的write办法。

所以下面的:

ctx.write("Channel Active状态!\r\n");

实际上能够间接从channel中调用:

Channel ch = b.bind(0).sync().channel();// 将音讯写入channel中ch.writeAndFlush("Channel Active状态!\r\n").sync();

channel和channelGroup

channel是netty的灵魂,对于Bootstrap来说,要获取到对应的channel,能够通过调用:

b.bind(PORT).sync().channel()

来失去,从下面代码中咱们也能够看到一个Bootstrap只会对应一个channel。

channel中有一个parent()办法,用来返回它的父channel,所以channel是有层级构造的,

咱们再来看一下channelGroup的定义:

public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> 

能够看到ChannelGroup实际上是Channel的汇合。ChannelGroup用来将相似的Channel构建成汇合,从而能够对多个channel进行对立的治理。

能够能有小伙伴要问了,一个Bootstrap不是只对应一个channel吗?那么哪里来的channel的汇合?

事实上,在一些简单的程序中,咱们可能启动多个Bootstrap来解决不同的业务,所以相应的就会有多个channel。

如果创立的channel过多,并且这些channel又是很同质化的时候,就有需要对这些channel进行对立治理。这时候就须要用到channelGroup了。

channelGroup的根本应用

先看下channelGroup的根本应用,首先是创立一个channelGroup:

ChannelGroup recipients =           new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

有了channelGroup之后,能够调用add办法,向其中增加不同的channel:

   recipients.add(channelA);   recipients.add(channelB);

并且还能够对立向这些channel中发送音讯:

recipients.write(Unpooled.copiedBuffer(           "这是从channelGroup中收回的对立音讯.",           CharsetUtil.UTF_8));

基本上channelGroup提供了write,flush,flushAndWrite,writeAndFlush,disconnect,close,newCloseFuture等性能,用于对汇合中的channel进行对立治理。

如果你有多个channel,那么能够思考应用channelGroup。

另外channelGroup还有一些个性,咱们来具体理解一下。

将敞开的channel主动移出

ChannelGroup是一个channel的汇合,当然咱们只心愿保留open状态的channel,如果是close状态的channel,还要手动从ChannelGroup中移出的话切实是太麻烦了。

所以在ChannelGroup中,如果一个channel被敞开了,那么它会主动从ChannelGroup中移出,这个性能是怎么实现的呢?

先来看下channelGroup的add办法:

   public boolean add(Channel channel) {        ConcurrentMap<ChannelId, Channel> map =            channel instanceof ServerChannel? serverChannels : nonServerChannels;        boolean added = map.putIfAbsent(channel.id(), channel) == null;        if (added) {            channel.closeFuture().addListener(remover);        }        if (stayClosed && closed) {            channel.close();        }        return added;    }

能够看到,在add办法中,为channel辨别了是server channel还是非server channel。而后依据channel id将其存入ConcurrentMap中。

如果增加胜利,则给channel增加了一个closeFuture的回调。当channel被敞开的时候,会去调用这个remover办法:

private final ChannelFutureListener remover = new ChannelFutureListener() {        @Override        public void operationComplete(ChannelFuture future) throws Exception {            remove(future.channel());        }    };

remover办法会将channel从serverChannels或者nonServerChannels中移出。从而保障ChannelGroup中只保留open状态的channel。

同时敞开serverChannel和acceptedChannel

尽管 ServerBootstrap的bind办法只会返回一个channel,然而对于server来说,能够有多个worker EventLoopGroup,所以当客户端和服务器端建设连贯之后建设的accepted Channel是server channel的子channel。

也就是说一个服务器端有一个server channel和多个accepted channel。

那么如果咱们想要同时敞开这些channel的话, 就能够应用ChannelGroup的close办法。

因为如果Server channel和非Server channel在同一个ChannelGroup的话,所有的IO命令都会先发给server channel,而后才会发给非server channel。

所以咱们能够将Server channel和非Server channel通通退出同一个ChannelGroup中,在程序的最初,对立调用ChannelGroup的close办法,从而达到该目标:

   ChannelGroup allChannels =           new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);     public static void main(String[] args) throws Exception {       ServerBootstrap b = new ServerBootstrap(..);       ...       b.childHandler(new MyHandler());         // 启动服务器       b.getPipeline().addLast("handler", new MyHandler());       Channel serverChannel = b.bind(..).sync();       allChannels.add(serverChannel);         ... 期待shutdown指令 ...         // 敞开serverChannel 和所有的 accepted connections.       allChannels.close().awaitUninterruptibly();   }     public class MyHandler extends ChannelInboundHandlerAdapter {        @Override       public void channelActive(ChannelHandlerContext ctx) {           // 将accepted channel增加到allChannels中           allChannels.add(ctx.channel());           super.channelActive(ctx);       }   }

ChannelGroupFuture

另外,和channel一样,channelGroup的操作都是异步的,它会返回一个ChannelGroupFuture对象。

咱们看下ChannelGroupFuture的定义:

public interface ChannelGroupFuture extends Future<Void>, Iterable<ChannelFuture>

能够看到ChannelGroupFuture是一个Future,同时它也是一个ChannelFuture的遍历器,能够遍历ChannelGroup中所有channel返回的ChannelFuture。

同时ChannelGroupFuture提供了isSuccess,isPartialSuccess,isPartialFailure等办法判断命令是否局部胜利。

ChannelGroupFuture还提供了addListener办法用来监听具体的事件。

总结

channel是netty的外围,当咱们有多个channel不便进行治理的时候,就能够应用channelGroup进行对立治理。

本文已收录于 http://www.flydean.com/04-1-netty-channel-group/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」,懂技术,更懂你!