关于netty:netty系列之EventHandler和Pipeline

6次阅读

共计 7052 个字符,预计需要花费 18 分钟才能阅读完成。

简介

上一节咱们解说了 netty 中的 Channel,晓得了 channel 是事件处理器和内部联通的桥梁。明天本文将会具体解说 netty 的剩下几个十分总要的局部 Event、Handler 和 PipeLine。

ChannelPipeline

pipeLine 是连贯 Channel 和 handler 的桥梁,它实际上是一个 filter 的实现,用于管制其中 handler 的解决形式。

当一个 channel 被创立的时候,和它对应的 ChannelPipeline 也会被创立。

先看下 ChannelPipeline 的定义:

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable 

首先 ChannelPipeline 继承自 Iterable,示意它是可遍历的,而遍历的后果就是其中一个个的 Handler。

作为一个合格的 Iterable,ChannelPipeline 提供了一系列的 add 和 remote 办法,通过这些办法就能够向 ChannelPipeline 中增加或者移除 Handler。因为 ChannelPipeline 是一个 filter,而过滤器是须要指定对应的 filter 的程序的,所以 ChannelPipeline 中有 addFirst 和 addLast 这种增加不同程序的办法。

而后能够看到 ChannelPipeline 继承了 ChannelInboundInvoker 和 ChannelOutboundInvoker 两个接口。

先看一张 channelPipeline 的工作流程图:

能够看出 ChannelPipeline 次要有两种操作,一种是读入 Inbound,一种是写出 OutBound。

对于 Socket.read() 这样的读入操作,调用的实际上就是 ChannelInboundInvoker 中的办法。对于内部的 IO 写入的申请,调用的就是 ChannelOutboundInvoker 中的办法。

留神,Inbound 和 outbound 的解决程序是相同的,比方上面的例子:

    ChannelPipeline p = ...;
   p.addLast("1", new InboundHandlerA());
   p.addLast("2", new InboundHandlerB());
   p.addLast("3", new OutboundHandlerA());
   p.addLast("4", new OutboundHandlerB());
   p.addLast("5", new InboundOutboundHandlerX());

下面的代码中咱们向 ChannelPipeline 增加了 5 个 handler,其中 2 个 InboundHandler,2 个 OutboundHandler 和一个同时解决 In 和 Out 的 Handler。

那么当 channel 遇到 inbound event 的时候,就会依照 1,2,3,4,5 的程序进行解决,然而只有 InboundHandler 能力解决 Inbound 事件,所以,真正执行的程序是 1,2,5。

同样的当 channel 遇到 outbound event 的时候,会依照 5,4,3,2,1 的程序进行执行,然而只有 outboundHandler 能力解决 Outbound 事件,所以真正执行的程序是 5,4,3.

简略的说,ChannelPipeline 指定了 Handler 的执行程序。

ChannelHandler

netty 是一个事件驱动的框架,所有的 event 都是由 Handler 来进行解决的。ChannelHandler 能够解决 IO、拦挡 IO 或者将 event 传递给 ChannelPipeline 中的下一个 Handler 进行解决。

ChannelHandler 的构造很简略,只有三个办法,别离是:

void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

依据 inbound 和 outbound 事件的不同,ChannelHandler 能够分为两类,别离是 ChannelInboundHandler 和 ChannelOutboundHandler.

因为这两个都是 interface,实现起来比拟麻烦,所以 netty 为大家提供了三个默认的实现:ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter 和 ChannelDuplexHandler。后面两个很好了解,别离是 inbound 和 outbound, 最初一个能够同时解决 inbound 和 outbound。

ChannelHandler 是由 ChannelHandlerContext 提供的,并且和 ChannelPipeline 的交互也是通过 ChannelHandlerContext 来进行的。

ChannelHandlerContext

ChannelHandlerContext 能够让 ChannelHandler 和 ChannelPipeline 或者其余的 Handler 进行交互。它就是一个上下文环境,使得 Handler 和 Channel 能够相互作用。

如能够在 ChannelHandlerContext 中,调用 channel()取得绑定的 channel。能够通过调用 handler()取得绑定的 Handler。通过调用 fire* 办法来触发 Channel 的事件。

看下 ChannelHandlerContext 的定义:

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker 

能够看到他是一个 AttributeMap 用来存储属性,还是一个 ChannelInboundInvoker 和 ChannelOutboundInvoker 用来触发和流传相应的事件。

对于 Inbound 来说流传事件的办法有:

ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

对于 Outbound 来说流传事件的办法有:

ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
ChannelHandlerContext.deregister(ChannelPromise)

这些办法,在一个 Handler 中调用,而后将事件传递给下一个 Handler,如下所示:

   public class MyInboundHandler extends ChannelInboundHandlerAdapter {
        @Override
       public void channelActive(ChannelHandlerContext ctx) {System.out.println("Connected!");
           ctx.fireChannelActive();}
   }
  
   public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
        @Override
       public void close(ChannelHandlerContext ctx, ChannelPromise promise) {System.out.println("Closing ..");
           ctx.close(promise);
       }
   }

ChannelHandler 中的状态变量

ChannelHandler 是一个 Handler 类,个别状况下,这个类的实例是能够被多个 channel 独特应用的,前提是这个 ChannelHandler 没有共享的状态变量。

但有时候,咱们必须要在 ChannelHandler 中放弃一个状态,那么就波及到 ChannelHandler 中的状态变量的问题,看上面的一个例子:

  public interface Message {// your methods here}
  
   public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
  
       private boolean loggedIn;
  
        @Override
       public void channelRead0(ChannelHandlerContext ctx, Message message) {if (message instanceof LoginMessage) {authenticate((LoginMessage) message);
               loggedIn = true;
           } else (message instanceof GetDataMessage) {if (loggedIn) {ctx.writeAndFlush(fetchSecret((GetDataMessage) message));
               } else {fail();
               }
           }
       }
       ...
   }

这个例子中,咱们须要在收到 LoginMessage 之后,对音讯进行认证,并保留认证状态,因为业务逻辑是这样的,所以必须要有一个状态变量。

那么这样带有状态变量的 Handler 就只能绑定一个 channel,如果绑定多个 channel 就有可能呈现状态不统一的问题。一个 channel 绑定一个 Handler 实例,很简略,只须要在 initChannel 办法中应用 new 关键字新建一个对象即可。

   public class DataServerInitializer extends ChannelInitializer<Channel> {
        @Override
       public void initChannel(Channel channel) {channel.pipeline().addLast("handler", new DataServerHandler());
       }
   }

那么除了新建 handler 实例之外,还有没有其余的方法呢?当然是有的,那就是 ChannelHandlerContext 中的 AttributeKey 属性。还是下面的例子,咱们看一下应用 AttributeKey 应该怎么实现:

   public interface Message {// your methods here}
  
    @Sharable
   public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
       private final AttributeKey<Boolean> auth =
             AttributeKey.valueOf("auth");
  
        @Override
       public void channelRead(ChannelHandlerContext ctx, Message message) {Attribute<Boolean> attr = ctx.attr(auth);
           if (message instanceof LoginMessage) {authenticate((LoginMessage) o);
               attr.set(true);
           } else (message instanceof GetDataMessage) {if (Boolean.TRUE.equals(attr.get())) {ctx.writeAndFlush(fetchSecret((GetDataMessage) o));
               } else {fail();
               }
           }
       }
       ...
   }

上例中,首先定义了一个 AttributeKey,而后应用 ChannelHandlerContext 的 attr 办法将 Attribute 设置到 ChannelHandlerContext 中,这样该 Attribute 绑定到这个 ChannelHandlerContext 中了。后续即便应用同一个 Handler 在不同的 Channel 中该属性也是不同的。

上面是应用共享 Handler 的例子:

   public class DataServerInitializer extends ChannelInitializer<Channel> {private static final DataServerHandler SHARED = new DataServerHandler();
  
        @Override
       public void initChannel(Channel channel) {channel.pipeline().addLast("handler", SHARED);
       }
   }

留神,在定义 DataServerHandler 的时候,咱们加上了 @Sharable 注解,如果一个 ChannelHandler 应用了 @Sharable 注解,那就意味着你能够只创立一次这个 Handler,然而能够将其绑定到一个或者多个 ChannelPipeline 中。

留神,@Sharable 注解是为 java 文档筹备的,并不会影响到理论的代码执行成果。

异步 Handler

之前介绍了,能够通过调用 pipeline.addLast 办法将 handler 退出到 pipeline 中,因为 pipeline 是一个 filter 的构造,所以退出的 handler 是程序进行解决的。

然而,我心愿某些 handler 是在新的线程中执行该怎么办?如果咱们心愿这些新的线程中执行的 Handler 是无序的又该怎么办?

比方咱们当初有 3 个 handler 别离是 MyHandler1,MyHandler2 和 MyHandler3。

程序执行的写法是这样的:

ChannelPipeline pipeline = ch.pipeline();
  
   pipeline.addLast("MyHandler1", new MyHandler1());
   pipeline.addLast("MyHandler2", new MyHandler2());
   pipeline.addLast("MyHandler3", new MyHandler3());

如果要让 MyHandler3 在新的线程中执行,则能够退出 group 选项,从而让 handler 在新的 group 中运行:

static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
ChannelPipeline pipeline = ch.pipeline();
  
   pipeline.addLast("MyHandler1", new MyHandler1());
   pipeline.addLast("MyHandler2", new MyHandler2());
   pipeline.addLast(group,"MyHandler3", new MyHandler3());

然而上例中 DefaultEventExecutorGroup 退出的 Handler 也是会程序执行的,如果的确不想程序执行,那么能够尝试思考应用 UnorderedThreadPoolEventExecutor。

总结

本文解说了 Event、Handler 和 PipeLine,并举例说明他们之间的关系和相互作用。后续会从 netty 的具体实际登程,进一步加深对 netty 的了解和利用,心愿大家可能喜爱。

本文已收录于 http://www.flydean.com/05-netty-channelevent/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」, 懂技术,更懂你!

正文完
 0