简介

上一节咱们解说了netty中的Channel,晓得了channel是事件处理器和内部联通的桥梁。明天本文将会具体解说netty的剩下几个十分总要的局部Event、Handler和PipeLine。

ChannelPipeline

pipeLine是连贯Channel和handler的桥梁,它实际上是一个filter的实现,用于管制其中handler的解决形式。

当一个channel被创立的时候,和它对应的ChannelPipeline也会被创立。

先看下ChannelPipeline的定义:

public interface ChannelPipeline        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable 

首先ChannelPipeline继承自Iterable,示意它是可遍历的,而遍历的后果就是其中一个个的Handler。

作为一个合格的Iterable,ChannelPipeline提供了一系列的add和remote办法,通过这些办法就能够向ChannelPipeline中增加或者移除Handler。因为ChannelPipeline是一个filter,而过滤器是须要指定对应的filter的程序的,所以ChannelPipeline中有addFirst和addLast这种增加不同程序的办法。

而后能够看到ChannelPipeline继承了ChannelInboundInvoker和ChannelOutboundInvoker两个接口。

先看一张channelPipeline的工作流程图:

能够看出ChannelPipeline次要有两种操作,一种是读入Inbound,一种是写出OutBound。

对于Socket.read()这样的读入操作,调用的实际上就是ChannelInboundInvoker中的办法。对于内部的IO写入的申请,调用的就是ChannelOutboundInvoker中的办法。

留神,Inbound和outbound的解决程序是相同的,比方上面的例子:

    ChannelPipeline p = ...;   p.addLast("1", new InboundHandlerA());   p.addLast("2", new InboundHandlerB());   p.addLast("3", new OutboundHandlerA());   p.addLast("4", new OutboundHandlerB());   p.addLast("5", new InboundOutboundHandlerX());

下面的代码中咱们向ChannelPipeline增加了5个handler,其中2个InboundHandler,2个OutboundHandler和一个同时解决In和Out的Handler。

那么当channel遇到inbound event的时候,就会依照1,2,3,4,5的程序进行解决,然而只有InboundHandler能力解决Inbound事件,所以,真正执行的程序是1,2,5。

同样的当channel遇到outbound event的时候,会依照5,4,3,2,1的程序进行执行,然而只有outboundHandler能力解决Outbound事件,所以真正执行的程序是5,4,3.

简略的说,ChannelPipeline指定了Handler的执行程序。

ChannelHandler

netty是一个事件驱动的框架,所有的event都是由Handler来进行解决的。ChannelHandler能够解决IO、拦挡IO或者将event传递给ChannelPipeline中的下一个Handler进行解决。

ChannelHandler的构造很简略,只有三个办法,别离是:

void handlerAdded(ChannelHandlerContext ctx) throws Exception;void handlerRemoved(ChannelHandlerContext ctx) throws Exception;void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

依据inbound和outbound事件的不同,ChannelHandler能够分为两类,别离是ChannelInboundHandler 和ChannelOutboundHandler.

因为这两个都是interface,实现起来比拟麻烦,所以netty为大家提供了三个默认的实现:ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter和ChannelDuplexHandler。后面两个很好了解,别离是inbound和outbound,最初一个能够同时解决inbound和outbound。

ChannelHandler是由ChannelHandlerContext提供的,并且和ChannelPipeline的交互也是通过ChannelHandlerContext来进行的。

ChannelHandlerContext

ChannelHandlerContext能够让ChannelHandler和ChannelPipeline或者其余的Handler进行交互。它就是一个上下文环境,使得Handler和Channel能够相互作用。

如能够在ChannelHandlerContext中,调用channel()取得绑定的channel。能够通过调用handler()取得绑定的Handler。通过调用fire*办法来触发Channel的事件。

看下ChannelHandlerContext的定义:

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker 

能够看到他是一个AttributeMap用来存储属性,还是一个ChannelInboundInvoker和ChannelOutboundInvoker用来触发和流传相应的事件。

对于Inbound来说流传事件的办法有:

ChannelHandlerContext.fireChannelRegistered()ChannelHandlerContext.fireChannelActive()ChannelHandlerContext.fireChannelRead(Object)ChannelHandlerContext.fireChannelReadComplete()ChannelHandlerContext.fireExceptionCaught(Throwable)ChannelHandlerContext.fireUserEventTriggered(Object)ChannelHandlerContext.fireChannelWritabilityChanged()ChannelHandlerContext.fireChannelInactive()ChannelHandlerContext.fireChannelUnregistered()

对于Outbound来说流传事件的办法有:

ChannelHandlerContext.bind(SocketAddress, ChannelPromise)ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)ChannelHandlerContext.write(Object, ChannelPromise)ChannelHandlerContext.flush()ChannelHandlerContext.read()ChannelHandlerContext.disconnect(ChannelPromise)ChannelHandlerContext.close(ChannelPromise)ChannelHandlerContext.deregister(ChannelPromise)

这些办法,在一个Handler中调用,而后将事件传递给下一个Handler,如下所示:

   public class MyInboundHandler extends ChannelInboundHandlerAdapter {        @Override       public void channelActive(ChannelHandlerContext ctx) {           System.out.println("Connected!");           ctx.fireChannelActive();       }   }     public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {        @Override       public void close(ChannelHandlerContext ctx, ChannelPromise promise) {           System.out.println("Closing ..");           ctx.close(promise);       }   }

ChannelHandler中的状态变量

ChannelHandler是一个Handler类,个别状况下,这个类的实例是能够被多个channel独特应用的,前提是这个ChannelHandler没有共享的状态变量。

但有时候,咱们必须要在ChannelHandler中放弃一个状态,那么就波及到ChannelHandler中的状态变量的问题,看上面的一个例子:

  public interface Message {       // your methods here   }     public class DataServerHandler extends SimpleChannelInboundHandler<Message> {         private boolean loggedIn;          @Override       public void channelRead0(ChannelHandlerContext ctx, Message message) {           if (message instanceof LoginMessage) {               authenticate((LoginMessage) message);               loggedIn = true;           } else (message instanceof GetDataMessage) {               if (loggedIn) {                   ctx.writeAndFlush(fetchSecret((GetDataMessage) message));               } else {                   fail();               }           }       }       ...   }

这个例子中,咱们须要在收到LoginMessage之后,对音讯进行认证,并保留认证状态,因为业务逻辑是这样的,所以必须要有一个状态变量。

那么这样带有状态变量的Handler就只能绑定一个channel,如果绑定多个channel就有可能呈现状态不统一的问题。一个channel绑定一个Handler实例,很简略,只须要在initChannel办法中应用new关键字新建一个对象即可。

   public class DataServerInitializer extends ChannelInitializer<Channel> {        @Override       public void initChannel(Channel channel) {           channel.pipeline().addLast("handler", new DataServerHandler());       }   }

那么除了新建handler实例之外,还有没有其余的方法呢?当然是有的,那就是 ChannelHandlerContext 中的AttributeKey属性。还是下面的例子,咱们看一下应用AttributeKey应该怎么实现:

   public interface Message {       // your methods here   }      @Sharable   public class DataServerHandler extends SimpleChannelInboundHandler<Message> {       private final AttributeKey<Boolean> auth =             AttributeKey.valueOf("auth");          @Override       public void channelRead(ChannelHandlerContext ctx, Message message) {           Attribute<Boolean> attr = ctx.attr(auth);           if (message instanceof LoginMessage) {               authenticate((LoginMessage) o);               attr.set(true);           } else (message instanceof GetDataMessage) {               if (Boolean.TRUE.equals(attr.get())) {                   ctx.writeAndFlush(fetchSecret((GetDataMessage) o));               } else {                   fail();               }           }       }       ...   }

上例中,首先定义了一个AttributeKey,而后应用ChannelHandlerContext的attr办法将Attribute设置到ChannelHandlerContext中,这样该Attribute绑定到这个ChannelHandlerContext中了。后续即便应用同一个Handler在不同的Channel中该属性也是不同的。

上面是应用共享Handler的例子:

   public class DataServerInitializer extends ChannelInitializer<Channel> {         private static final DataServerHandler SHARED = new DataServerHandler();          @Override       public void initChannel(Channel channel) {           channel.pipeline().addLast("handler", SHARED);       }   }

留神,在定义DataServerHandler的时候,咱们加上了@Sharable注解,如果一个ChannelHandler应用了@Sharable注解,那就意味着你能够只创立一次这个Handler,然而能够将其绑定到一个或者多个ChannelPipeline中。

留神,@Sharable注解是为java文档筹备的,并不会影响到理论的代码执行成果。

异步Handler

之前介绍了,能够通过调用pipeline.addLast办法将handler退出到pipeline中,因为pipeline是一个filter的构造,所以退出的handler是程序进行解决的。

然而,我心愿某些handler是在新的线程中执行该怎么办?如果咱们心愿这些新的线程中执行的Handler是无序的又该怎么办?

比方咱们当初有3个handler别离是MyHandler1,MyHandler2和MyHandler3。

程序执行的写法是这样的:

ChannelPipeline pipeline = ch.pipeline();     pipeline.addLast("MyHandler1", new MyHandler1());   pipeline.addLast("MyHandler2", new MyHandler2());   pipeline.addLast("MyHandler3", new MyHandler3());

如果要让MyHandler3在新的线程中执行,则能够退出group选项,从而让handler在新的group中运行:

static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);ChannelPipeline pipeline = ch.pipeline();     pipeline.addLast("MyHandler1", new MyHandler1());   pipeline.addLast("MyHandler2", new MyHandler2());   pipeline.addLast(group,"MyHandler3", new MyHandler3());

然而上例中DefaultEventExecutorGroup退出的Handler也是会程序执行的,如果的确不想程序执行,那么能够尝试思考应用UnorderedThreadPoolEventExecutor 。

总结

本文解说了Event、Handler和PipeLine,并举例说明他们之间的关系和相互作用。后续会从netty的具体实际登程,进一步加深对netty的了解和利用,心愿大家可能喜爱。

本文已收录于 http://www.flydean.com/05-netty-channelevent/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」,懂技术,更懂你!