简介
上一节咱们解说了netty中的Channel,晓得了channel是事件处理器和内部联通的桥梁。明天本文将会具体解说netty的剩下几个十分总要的局部Event、Handler和PipeLine。
ChannelPipeline
pipeLine是连贯Channel和handler的桥梁,它实际上是一个filter的实现,用于管制其中handler的解决形式。
当一个channel被创立的时候,和它对应的ChannelPipeline也会被创立。
先看下ChannelPipeline的定义:
public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable
首先ChannelPipeline继承自Iterable,示意它是可遍历的,而遍历的后果就是其中一个个的Handler。
作为一个合格的Iterable,ChannelPipeline提供了一系列的add和remote办法,通过这些办法就能够向ChannelPipeline中增加或者移除Handler。因为ChannelPipeline是一个filter,而过滤器是须要指定对应的filter的程序的,所以ChannelPipeline中有addFirst和addLast这种增加不同程序的办法。
而后能够看到ChannelPipeline继承了ChannelInboundInvoker和ChannelOutboundInvoker两个接口。
先看一张channelPipeline的工作流程图:
能够看出ChannelPipeline次要有两种操作,一种是读入Inbound,一种是写出OutBound。
对于Socket.read()这样的读入操作,调用的实际上就是ChannelInboundInvoker中的办法。对于内部的IO写入的申请,调用的就是ChannelOutboundInvoker中的办法。
留神,Inbound和outbound的解决程序是相同的,比方上面的例子:
ChannelPipeline p = ...; p.addLast("1", new InboundHandlerA()); p.addLast("2", new InboundHandlerB()); p.addLast("3", new OutboundHandlerA()); p.addLast("4", new OutboundHandlerB()); p.addLast("5", new InboundOutboundHandlerX());
下面的代码中咱们向ChannelPipeline增加了5个handler,其中2个InboundHandler,2个OutboundHandler和一个同时解决In和Out的Handler。
那么当channel遇到inbound event的时候,就会依照1,2,3,4,5的程序进行解决,然而只有InboundHandler能力解决Inbound事件,所以,真正执行的程序是1,2,5。
同样的当channel遇到outbound event的时候,会依照5,4,3,2,1的程序进行执行,然而只有outboundHandler能力解决Outbound事件,所以真正执行的程序是5,4,3.
简略的说,ChannelPipeline指定了Handler的执行程序。
ChannelHandler
netty是一个事件驱动的框架,所有的event都是由Handler来进行解决的。ChannelHandler能够解决IO、拦挡IO或者将event传递给ChannelPipeline中的下一个Handler进行解决。
ChannelHandler的构造很简略,只有三个办法,别离是:
void handlerAdded(ChannelHandlerContext ctx) throws Exception;void handlerRemoved(ChannelHandlerContext ctx) throws Exception;void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
依据inbound和outbound事件的不同,ChannelHandler能够分为两类,别离是ChannelInboundHandler 和ChannelOutboundHandler.
因为这两个都是interface,实现起来比拟麻烦,所以netty为大家提供了三个默认的实现:ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter和ChannelDuplexHandler。后面两个很好了解,别离是inbound和outbound,最初一个能够同时解决inbound和outbound。
ChannelHandler是由ChannelHandlerContext提供的,并且和ChannelPipeline的交互也是通过ChannelHandlerContext来进行的。
ChannelHandlerContext
ChannelHandlerContext能够让ChannelHandler和ChannelPipeline或者其余的Handler进行交互。它就是一个上下文环境,使得Handler和Channel能够相互作用。
如能够在ChannelHandlerContext中,调用channel()取得绑定的channel。能够通过调用handler()取得绑定的Handler。通过调用fire*办法来触发Channel的事件。
看下ChannelHandlerContext的定义:
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker
能够看到他是一个AttributeMap用来存储属性,还是一个ChannelInboundInvoker和ChannelOutboundInvoker用来触发和流传相应的事件。
对于Inbound来说流传事件的办法有:
ChannelHandlerContext.fireChannelRegistered()ChannelHandlerContext.fireChannelActive()ChannelHandlerContext.fireChannelRead(Object)ChannelHandlerContext.fireChannelReadComplete()ChannelHandlerContext.fireExceptionCaught(Throwable)ChannelHandlerContext.fireUserEventTriggered(Object)ChannelHandlerContext.fireChannelWritabilityChanged()ChannelHandlerContext.fireChannelInactive()ChannelHandlerContext.fireChannelUnregistered()
对于Outbound来说流传事件的办法有:
ChannelHandlerContext.bind(SocketAddress, ChannelPromise)ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)ChannelHandlerContext.write(Object, ChannelPromise)ChannelHandlerContext.flush()ChannelHandlerContext.read()ChannelHandlerContext.disconnect(ChannelPromise)ChannelHandlerContext.close(ChannelPromise)ChannelHandlerContext.deregister(ChannelPromise)
这些办法,在一个Handler中调用,而后将事件传递给下一个Handler,如下所示:
public class MyInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("Connected!"); ctx.fireChannelActive(); } } public class MyOutboundHandler extends ChannelOutboundHandlerAdapter { @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) { System.out.println("Closing .."); ctx.close(promise); } }
ChannelHandler中的状态变量
ChannelHandler是一个Handler类,个别状况下,这个类的实例是能够被多个channel独特应用的,前提是这个ChannelHandler没有共享的状态变量。
但有时候,咱们必须要在ChannelHandler中放弃一个状态,那么就波及到ChannelHandler中的状态变量的问题,看上面的一个例子:
public interface Message { // your methods here } public class DataServerHandler extends SimpleChannelInboundHandler<Message> { private boolean loggedIn; @Override public void channelRead0(ChannelHandlerContext ctx, Message message) { if (message instanceof LoginMessage) { authenticate((LoginMessage) message); loggedIn = true; } else (message instanceof GetDataMessage) { if (loggedIn) { ctx.writeAndFlush(fetchSecret((GetDataMessage) message)); } else { fail(); } } } ... }
这个例子中,咱们须要在收到LoginMessage之后,对音讯进行认证,并保留认证状态,因为业务逻辑是这样的,所以必须要有一个状态变量。
那么这样带有状态变量的Handler就只能绑定一个channel,如果绑定多个channel就有可能呈现状态不统一的问题。一个channel绑定一个Handler实例,很简略,只须要在initChannel办法中应用new关键字新建一个对象即可。
public class DataServerInitializer extends ChannelInitializer<Channel> { @Override public void initChannel(Channel channel) { channel.pipeline().addLast("handler", new DataServerHandler()); } }
那么除了新建handler实例之外,还有没有其余的方法呢?当然是有的,那就是 ChannelHandlerContext 中的AttributeKey属性。还是下面的例子,咱们看一下应用AttributeKey应该怎么实现:
public interface Message { // your methods here } @Sharable public class DataServerHandler extends SimpleChannelInboundHandler<Message> { private final AttributeKey<Boolean> auth = AttributeKey.valueOf("auth"); @Override public void channelRead(ChannelHandlerContext ctx, Message message) { Attribute<Boolean> attr = ctx.attr(auth); if (message instanceof LoginMessage) { authenticate((LoginMessage) o); attr.set(true); } else (message instanceof GetDataMessage) { if (Boolean.TRUE.equals(attr.get())) { ctx.writeAndFlush(fetchSecret((GetDataMessage) o)); } else { fail(); } } } ... }
上例中,首先定义了一个AttributeKey,而后应用ChannelHandlerContext的attr办法将Attribute设置到ChannelHandlerContext中,这样该Attribute绑定到这个ChannelHandlerContext中了。后续即便应用同一个Handler在不同的Channel中该属性也是不同的。
上面是应用共享Handler的例子:
public class DataServerInitializer extends ChannelInitializer<Channel> { private static final DataServerHandler SHARED = new DataServerHandler(); @Override public void initChannel(Channel channel) { channel.pipeline().addLast("handler", SHARED); } }
留神,在定义DataServerHandler的时候,咱们加上了@Sharable注解,如果一个ChannelHandler应用了@Sharable注解,那就意味着你能够只创立一次这个Handler,然而能够将其绑定到一个或者多个ChannelPipeline中。
留神,@Sharable注解是为java文档筹备的,并不会影响到理论的代码执行成果。
异步Handler
之前介绍了,能够通过调用pipeline.addLast办法将handler退出到pipeline中,因为pipeline是一个filter的构造,所以退出的handler是程序进行解决的。
然而,我心愿某些handler是在新的线程中执行该怎么办?如果咱们心愿这些新的线程中执行的Handler是无序的又该怎么办?
比方咱们当初有3个handler别离是MyHandler1,MyHandler2和MyHandler3。
程序执行的写法是这样的:
ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("MyHandler1", new MyHandler1()); pipeline.addLast("MyHandler2", new MyHandler2()); pipeline.addLast("MyHandler3", new MyHandler3());
如果要让MyHandler3在新的线程中执行,则能够退出group选项,从而让handler在新的group中运行:
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("MyHandler1", new MyHandler1()); pipeline.addLast("MyHandler2", new MyHandler2()); pipeline.addLast(group,"MyHandler3", new MyHandler3());
然而上例中DefaultEventExecutorGroup退出的Handler也是会程序执行的,如果的确不想程序执行,那么能够尝试思考应用UnorderedThreadPoolEventExecutor 。
总结
本文解说了Event、Handler和PipeLine,并举例说明他们之间的关系和相互作用。后续会从netty的具体实际登程,进一步加深对netty的了解和利用,心愿大家可能喜爱。
本文已收录于 http://www.flydean.com/05-netty-channelevent/
最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!
欢送关注我的公众号:「程序那些事」,懂技术,更懂你!