简介
上一节咱们解说了netty中的Channel,晓得了channel是事件处理器和内部联通的桥梁。明天本文将会具体解说netty的剩下几个十分总要的局部Event、Handler和PipeLine。
ChannelPipeline
pipeLine是连贯Channel和handler的桥梁,它实际上是一个filter的实现,用于管制其中handler的解决形式。
当一个channel被创立的时候,和它对应的ChannelPipeline也会被创立。
先看下ChannelPipeline的定义:
public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable
首先ChannelPipeline继承自Iterable,示意它是可遍历的,而遍历的后果就是其中一个个的Handler。
作为一个合格的Iterable,ChannelPipeline提供了一系列的add和remote办法,通过这些办法就能够向ChannelPipeline中增加或者移除Handler。因为ChannelPipeline是一个filter,而过滤器是须要指定对应的filter的程序的,所以ChannelPipeline中有addFirst和addLast这种增加不同程序的办法。
而后能够看到ChannelPipeline继承了ChannelInboundInvoker和ChannelOutboundInvoker两个接口。
先看一张channelPipeline的工作流程图:
能够看出ChannelPipeline次要有两种操作,一种是读入Inbound,一种是写出OutBound。
对于Socket.read()这样的读入操作,调用的实际上就是ChannelInboundInvoker中的办法。对于内部的IO写入的申请,调用的就是ChannelOutboundInvoker中的办法。
留神,Inbound和outbound的解决程序是相同的,比方上面的例子:
ChannelPipeline p = ...;
p.addLast("1", new InboundHandlerA());
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());
下面的代码中咱们向ChannelPipeline增加了5个handler,其中2个InboundHandler,2个OutboundHandler和一个同时解决In和Out的Handler。
那么当channel遇到inbound event的时候,就会依照1,2,3,4,5的程序进行解决,然而只有InboundHandler能力解决Inbound事件,所以,真正执行的程序是1,2,5。
同样的当channel遇到outbound event的时候,会依照5,4,3,2,1的程序进行执行,然而只有outboundHandler能力解决Outbound事件,所以真正执行的程序是5,4,3.
简略的说,ChannelPipeline指定了Handler的执行程序。
ChannelHandler
netty是一个事件驱动的框架,所有的event都是由Handler来进行解决的。ChannelHandler能够解决IO、拦挡IO或者将event传递给ChannelPipeline中的下一个Handler进行解决。
ChannelHandler的构造很简略,只有三个办法,别离是:
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
依据inbound和outbound事件的不同,ChannelHandler能够分为两类,别离是ChannelInboundHandler 和ChannelOutboundHandler.
因为这两个都是interface,实现起来比拟麻烦,所以netty为大家提供了三个默认的实现:ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter和ChannelDuplexHandler。后面两个很好了解,别离是inbound和outbound,最初一个能够同时解决inbound和outbound。
ChannelHandler是由ChannelHandlerContext提供的,并且和ChannelPipeline的交互也是通过ChannelHandlerContext来进行的。
ChannelHandlerContext
ChannelHandlerContext能够让ChannelHandler和ChannelPipeline或者其余的Handler进行交互。它就是一个上下文环境,使得Handler和Channel能够相互作用。
如能够在ChannelHandlerContext中,调用channel()取得绑定的channel。能够通过调用handler()取得绑定的Handler。通过调用fire*办法来触发Channel的事件。
看下ChannelHandlerContext的定义:
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker
能够看到他是一个AttributeMap用来存储属性,还是一个ChannelInboundInvoker和ChannelOutboundInvoker用来触发和流传相应的事件。
对于Inbound来说流传事件的办法有:
ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()
对于Outbound来说流传事件的办法有:
ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
ChannelHandlerContext.deregister(ChannelPromise)
这些办法,在一个Handler中调用,而后将事件传递给下一个Handler,如下所示:
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Connected!");
ctx.fireChannelActive();
}
}
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
System.out.println("Closing ..");
ctx.close(promise);
}
}
ChannelHandler中的状态变量
ChannelHandler是一个Handler类,个别状况下,这个类的实例是能够被多个channel独特应用的,前提是这个ChannelHandler没有共享的状态变量。
但有时候,咱们必须要在ChannelHandler中放弃一个状态,那么就波及到ChannelHandler中的状态变量的问题,看上面的一个例子:
public interface Message {
// your methods here
}
public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
private boolean loggedIn;
@Override
public void channelRead0(ChannelHandlerContext ctx, Message message) {
if (message instanceof LoginMessage) {
authenticate((LoginMessage) message);
loggedIn = true;
} else (message instanceof GetDataMessage) {
if (loggedIn) {
ctx.writeAndFlush(fetchSecret((GetDataMessage) message));
} else {
fail();
}
}
}
...
}
这个例子中,咱们须要在收到LoginMessage之后,对音讯进行认证,并保留认证状态,因为业务逻辑是这样的,所以必须要有一个状态变量。
那么这样带有状态变量的Handler就只能绑定一个channel,如果绑定多个channel就有可能呈现状态不统一的问题。一个channel绑定一个Handler实例,很简略,只须要在initChannel办法中应用new关键字新建一个对象即可。
public class DataServerInitializer extends ChannelInitializer<Channel> {
@Override
public void initChannel(Channel channel) {
channel.pipeline().addLast("handler", new DataServerHandler());
}
}
那么除了新建handler实例之外,还有没有其余的方法呢?当然是有的,那就是 ChannelHandlerContext 中的AttributeKey属性。还是下面的例子,咱们看一下应用AttributeKey应该怎么实现:
public interface Message {
// your methods here
}
@Sharable
public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
private final AttributeKey<Boolean> auth =
AttributeKey.valueOf("auth");
@Override
public void channelRead(ChannelHandlerContext ctx, Message message) {
Attribute<Boolean> attr = ctx.attr(auth);
if (message instanceof LoginMessage) {
authenticate((LoginMessage) o);
attr.set(true);
} else (message instanceof GetDataMessage) {
if (Boolean.TRUE.equals(attr.get())) {
ctx.writeAndFlush(fetchSecret((GetDataMessage) o));
} else {
fail();
}
}
}
...
}
上例中,首先定义了一个AttributeKey,而后应用ChannelHandlerContext的attr办法将Attribute设置到ChannelHandlerContext中,这样该Attribute绑定到这个ChannelHandlerContext中了。后续即便应用同一个Handler在不同的Channel中该属性也是不同的。
上面是应用共享Handler的例子:
public class DataServerInitializer extends ChannelInitializer<Channel> {
private static final DataServerHandler SHARED = new DataServerHandler();
@Override
public void initChannel(Channel channel) {
channel.pipeline().addLast("handler", SHARED);
}
}
留神,在定义DataServerHandler的时候,咱们加上了@Sharable注解,如果一个ChannelHandler应用了@Sharable注解,那就意味着你能够只创立一次这个Handler,然而能够将其绑定到一个或者多个ChannelPipeline中。
留神,@Sharable注解是为java文档筹备的,并不会影响到理论的代码执行成果。
异步Handler
之前介绍了,能够通过调用pipeline.addLast办法将handler退出到pipeline中,因为pipeline是一个filter的构造,所以退出的handler是程序进行解决的。
然而,我心愿某些handler是在新的线程中执行该怎么办?如果咱们心愿这些新的线程中执行的Handler是无序的又该怎么办?
比方咱们当初有3个handler别离是MyHandler1,MyHandler2和MyHandler3。
程序执行的写法是这样的:
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("MyHandler1", new MyHandler1());
pipeline.addLast("MyHandler2", new MyHandler2());
pipeline.addLast("MyHandler3", new MyHandler3());
如果要让MyHandler3在新的线程中执行,则能够退出group选项,从而让handler在新的group中运行:
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("MyHandler1", new MyHandler1());
pipeline.addLast("MyHandler2", new MyHandler2());
pipeline.addLast(group,"MyHandler3", new MyHandler3());
然而上例中DefaultEventExecutorGroup退出的Handler也是会程序执行的,如果的确不想程序执行,那么能够尝试思考应用UnorderedThreadPoolEventExecutor 。
总结
本文解说了Event、Handler和PipeLine,并举例说明他们之间的关系和相互作用。后续会从netty的具体实际登程,进一步加深对netty的了解和利用,心愿大家可能喜爱。
本文已收录于 http://www.flydean.com/05-netty-channelevent/
最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!
欢送关注我的公众号:「程序那些事」,懂技术,更懂你!
发表回复