关于netty:netty系列之EventHandler和Pipeline

简介

上一节咱们解说了netty中的Channel,晓得了channel是事件处理器和内部联通的桥梁。明天本文将会具体解说netty的剩下几个十分总要的局部Event、Handler和PipeLine。

ChannelPipeline

pipeLine是连贯Channel和handler的桥梁,它实际上是一个filter的实现,用于管制其中handler的解决形式。

当一个channel被创立的时候,和它对应的ChannelPipeline也会被创立。

先看下ChannelPipeline的定义:

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable 

首先ChannelPipeline继承自Iterable,示意它是可遍历的,而遍历的后果就是其中一个个的Handler。

作为一个合格的Iterable,ChannelPipeline提供了一系列的add和remote办法,通过这些办法就能够向ChannelPipeline中增加或者移除Handler。因为ChannelPipeline是一个filter,而过滤器是须要指定对应的filter的程序的,所以ChannelPipeline中有addFirst和addLast这种增加不同程序的办法。

而后能够看到ChannelPipeline继承了ChannelInboundInvoker和ChannelOutboundInvoker两个接口。

先看一张channelPipeline的工作流程图:

能够看出ChannelPipeline次要有两种操作,一种是读入Inbound,一种是写出OutBound。

对于Socket.read()这样的读入操作,调用的实际上就是ChannelInboundInvoker中的办法。对于内部的IO写入的申请,调用的就是ChannelOutboundInvoker中的办法。

留神,Inbound和outbound的解决程序是相同的,比方上面的例子:

    ChannelPipeline p = ...;
   p.addLast("1", new InboundHandlerA());
   p.addLast("2", new InboundHandlerB());
   p.addLast("3", new OutboundHandlerA());
   p.addLast("4", new OutboundHandlerB());
   p.addLast("5", new InboundOutboundHandlerX());

下面的代码中咱们向ChannelPipeline增加了5个handler,其中2个InboundHandler,2个OutboundHandler和一个同时解决In和Out的Handler。

那么当channel遇到inbound event的时候,就会依照1,2,3,4,5的程序进行解决,然而只有InboundHandler能力解决Inbound事件,所以,真正执行的程序是1,2,5。

同样的当channel遇到outbound event的时候,会依照5,4,3,2,1的程序进行执行,然而只有outboundHandler能力解决Outbound事件,所以真正执行的程序是5,4,3.

简略的说,ChannelPipeline指定了Handler的执行程序。

ChannelHandler

netty是一个事件驱动的框架,所有的event都是由Handler来进行解决的。ChannelHandler能够解决IO、拦挡IO或者将event传递给ChannelPipeline中的下一个Handler进行解决。

ChannelHandler的构造很简略,只有三个办法,别离是:

void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

依据inbound和outbound事件的不同,ChannelHandler能够分为两类,别离是ChannelInboundHandler 和ChannelOutboundHandler.

因为这两个都是interface,实现起来比拟麻烦,所以netty为大家提供了三个默认的实现:ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter和ChannelDuplexHandler。后面两个很好了解,别离是inbound和outbound,最初一个能够同时解决inbound和outbound。

ChannelHandler是由ChannelHandlerContext提供的,并且和ChannelPipeline的交互也是通过ChannelHandlerContext来进行的。

ChannelHandlerContext

ChannelHandlerContext能够让ChannelHandler和ChannelPipeline或者其余的Handler进行交互。它就是一个上下文环境,使得Handler和Channel能够相互作用。

如能够在ChannelHandlerContext中,调用channel()取得绑定的channel。能够通过调用handler()取得绑定的Handler。通过调用fire*办法来触发Channel的事件。

看下ChannelHandlerContext的定义:

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker 

能够看到他是一个AttributeMap用来存储属性,还是一个ChannelInboundInvoker和ChannelOutboundInvoker用来触发和流传相应的事件。

对于Inbound来说流传事件的办法有:

ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

对于Outbound来说流传事件的办法有:

ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
ChannelHandlerContext.deregister(ChannelPromise)

这些办法,在一个Handler中调用,而后将事件传递给下一个Handler,如下所示:

   public class MyInboundHandler extends ChannelInboundHandlerAdapter {
        @Override
       public void channelActive(ChannelHandlerContext ctx) {
           System.out.println("Connected!");
           ctx.fireChannelActive();
       }
   }
  
   public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
        @Override
       public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
           System.out.println("Closing ..");
           ctx.close(promise);
       }
   }

ChannelHandler中的状态变量

ChannelHandler是一个Handler类,个别状况下,这个类的实例是能够被多个channel独特应用的,前提是这个ChannelHandler没有共享的状态变量。

但有时候,咱们必须要在ChannelHandler中放弃一个状态,那么就波及到ChannelHandler中的状态变量的问题,看上面的一个例子:

  public interface Message {
       // your methods here
   }
  
   public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
  
       private boolean loggedIn;
  
        @Override
       public void channelRead0(ChannelHandlerContext ctx, Message message) {
           if (message instanceof LoginMessage) {
               authenticate((LoginMessage) message);
               loggedIn = true;
           } else (message instanceof GetDataMessage) {
               if (loggedIn) {
                   ctx.writeAndFlush(fetchSecret((GetDataMessage) message));
               } else {
                   fail();
               }
           }
       }
       ...
   }

这个例子中,咱们须要在收到LoginMessage之后,对音讯进行认证,并保留认证状态,因为业务逻辑是这样的,所以必须要有一个状态变量。

那么这样带有状态变量的Handler就只能绑定一个channel,如果绑定多个channel就有可能呈现状态不统一的问题。一个channel绑定一个Handler实例,很简略,只须要在initChannel办法中应用new关键字新建一个对象即可。

   public class DataServerInitializer extends ChannelInitializer<Channel> {
        @Override
       public void initChannel(Channel channel) {
           channel.pipeline().addLast("handler", new DataServerHandler());
       }
   }

那么除了新建handler实例之外,还有没有其余的方法呢?当然是有的,那就是 ChannelHandlerContext 中的AttributeKey属性。还是下面的例子,咱们看一下应用AttributeKey应该怎么实现:

   public interface Message {
       // your methods here
   }
  
    @Sharable
   public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
       private final AttributeKey<Boolean> auth =
             AttributeKey.valueOf("auth");
  
        @Override
       public void channelRead(ChannelHandlerContext ctx, Message message) {
           Attribute<Boolean> attr = ctx.attr(auth);
           if (message instanceof LoginMessage) {
               authenticate((LoginMessage) o);
               attr.set(true);
           } else (message instanceof GetDataMessage) {
               if (Boolean.TRUE.equals(attr.get())) {
                   ctx.writeAndFlush(fetchSecret((GetDataMessage) o));
               } else {
                   fail();
               }
           }
       }
       ...
   }

上例中,首先定义了一个AttributeKey,而后应用ChannelHandlerContext的attr办法将Attribute设置到ChannelHandlerContext中,这样该Attribute绑定到这个ChannelHandlerContext中了。后续即便应用同一个Handler在不同的Channel中该属性也是不同的。

上面是应用共享Handler的例子:

   public class DataServerInitializer extends ChannelInitializer<Channel> {
  
       private static final DataServerHandler SHARED = new DataServerHandler();
  
        @Override
       public void initChannel(Channel channel) {
           channel.pipeline().addLast("handler", SHARED);
       }
   }

留神,在定义DataServerHandler的时候,咱们加上了@Sharable注解,如果一个ChannelHandler应用了@Sharable注解,那就意味着你能够只创立一次这个Handler,然而能够将其绑定到一个或者多个ChannelPipeline中。

留神,@Sharable注解是为java文档筹备的,并不会影响到理论的代码执行成果。

异步Handler

之前介绍了,能够通过调用pipeline.addLast办法将handler退出到pipeline中,因为pipeline是一个filter的构造,所以退出的handler是程序进行解决的。

然而,我心愿某些handler是在新的线程中执行该怎么办?如果咱们心愿这些新的线程中执行的Handler是无序的又该怎么办?

比方咱们当初有3个handler别离是MyHandler1,MyHandler2和MyHandler3。

程序执行的写法是这样的:

ChannelPipeline pipeline = ch.pipeline();
  
   pipeline.addLast("MyHandler1", new MyHandler1());
   pipeline.addLast("MyHandler2", new MyHandler2());
   pipeline.addLast("MyHandler3", new MyHandler3());

如果要让MyHandler3在新的线程中执行,则能够退出group选项,从而让handler在新的group中运行:

static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
ChannelPipeline pipeline = ch.pipeline();
  
   pipeline.addLast("MyHandler1", new MyHandler1());
   pipeline.addLast("MyHandler2", new MyHandler2());
   pipeline.addLast(group,"MyHandler3", new MyHandler3());

然而上例中DefaultEventExecutorGroup退出的Handler也是会程序执行的,如果的确不想程序执行,那么能够尝试思考应用UnorderedThreadPoolEventExecutor 。

总结

本文解说了Event、Handler和PipeLine,并举例说明他们之间的关系和相互作用。后续会从netty的具体实际登程,进一步加深对netty的了解和利用,心愿大家可能喜爱。

本文已收录于 http://www.flydean.com/05-netty-channelevent/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」,懂技术,更懂你!

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理