乐趣区

关于java:netty系列之channelPipeline详解

简介

咱们在介绍 channel 的时候提到过,简直 channel 中所有的实现都是通过 channelPipeline 进行的,作为一个 pipline,它到底是如何工作的呢?

一起来看看吧。

ChannelPipeline

ChannelPipeline 是一个 interface,它继承了三个接口,别离是 ChannelInboundInvoker,ChannelOutboundInvoker 和 Iterable:

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> 

继承自 ChannelInboundInvoker,示意 ChannelPipeline 能够触发 channel inboud 的一些事件,比方:

ChannelInboundInvoker fireChannelRegistered();
ChannelInboundInvoker fireChannelUnregistered();
ChannelInboundInvoker fireChannelActive();
ChannelInboundInvoker fireChannelInactive();
ChannelInboundInvoker fireExceptionCaught(Throwable cause);
ChannelInboundInvoker fireUserEventTriggered(Object event);
ChannelInboundInvoker fireChannelRead(Object msg);
ChannelInboundInvoker fireChannelReadComplete();
ChannelInboundInvoker fireChannelWritabilityChanged();

继承自 ChannelOutboundInvoker,示意 ChannelPipeline 能够进行一些 channel 的被动操作,如:bind,connect,disconnect,close,deregister,read,write,flush 等操作。

继承自 Iterable,示意 ChannelPipeline 是可遍历的,为什么 ChannelPipeline 是可遍历的呢?

因为 ChannelPipeline 中能够增加一个或者多个 ChannelHandler,ChannelPipeline 能够看做是一个 ChannelHandler 的汇合。

比方 ChannelPipeline 提供了一系列的增加 ChannelHandler 的办法:

ChannelPipeline addFirst(String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline addFirst(ChannelHandler... handlers);

ChannelPipeline addLast(String name, ChannelHandler handler);
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addLast(ChannelHandler... handlers);
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);

ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

能够从后面增加,也能够从前面增加,或者从特定的地位增加 handler。

另外还能够从 pipeline 中删除特定的 channelHandler,或者移出和替换特定地位的 handler:

ChannelPipeline remove(ChannelHandler handler);
ChannelHandler remove(String name);
ChannelHandler removeFirst();
ChannelHandler removeLast();
ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);

当然,更少不了对应的查问操作:

ChannelHandler first();
ChannelHandler last();
ChannelHandler get(String name);
List<String> names();

还能够依据传入的 ChannelHandler 取得 handler 对应的 ChannelHandlerContext。

ChannelHandlerContext context(ChannelHandler handler);

ChannelPipeline 中还有一些触发 channel 相干的事件,如:

    ChannelPipeline fireChannelRegistered();
    ChannelPipeline fireChannelUnregistered();
    ChannelPipeline fireChannelActive();
    ChannelPipeline fireChannelInactive();
    ChannelPipeline fireExceptionCaught(Throwable cause);
    ChannelPipeline fireUserEventTriggered(Object event);
    ChannelPipeline fireChannelRead(Object msg);
    ChannelPipeline fireChannelReadComplete();
    ChannelPipeline fireChannelWritabilityChanged();

事件传递

那么有些敌人可能会问了,既然 ChannelPipeline 中蕴含了很多个 handler,那么 handler 中的事件是怎么传递的呢?

其实这些事件是通过调用 ChannelHandlerContext 中的相应办法来触发的。

对于 Inbound 事件来说,能够调用上面的办法,进行事件的传递:

ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

对于 Outbound 事件来说,能够调用上面的办法,进行事件的传递:

ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
ChannelHandlerContext.deregister(ChannelPromise)

具体而言,就是在 handler 中调用 ChannelHandlerContext 中对应的办法:

   public class MyInboundHandler extends ChannelInboundHandlerAdapter {
        @Override
       public void channelActive(ChannelHandlerContext ctx) {System.out.println("Connected!");
           ctx.fireChannelActive();}
   }
  
   public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
        @Override
       public void close(ChannelHandlerContext ctx, ChannelPromise promise) {System.out.println("Closing ..");
           ctx.close(promise);
       }
   }
   

DefaultChannelPipeline

ChannelPipeline 有一个官网的实现叫做 DefaultChannelPipeline,因为对于 pipeline 来说,次要的性能就是进行 handler 的治理和事件传递,绝对于而言性能比较简单,然而他也有一些特地的实现中央, 比方它有两个 AbstractChannelHandlerContext 类型的 head 和 tail。

咱们晓得 ChannelPipeline 实际上是很多 handler 的汇合,那么这些汇合是怎么进行存储的呢?这种存储的数据结构就是 AbstractChannelHandlerContext。每个 AbstractChannelHandlerContext 中都有一个 next 节点和一个 prev 节点,用来组成一个双向链表。

同样的在 DefaultChannelPipeline 中应用 head 和 tail 来将封装好的 handler 存储起来。

留神,这里的 head 和 tail 尽管都是 AbstractChannelHandlerContext,然而两者有稍许不同。先看下 head 和 tail 的定义:

    protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

在 DefaultChannelPipeline 的构造函数中,对 tail 和 head 进行初始化,其中 tail 是 TailContext,而 head 是 HeadContext。

其中 TailContext 实现了 ChannelInboundHandler 接口:

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler

而 HeadContext 实现了 ChannelOutboundHandler 和 ChannelInboundHandler 接口:

final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler 

上面咱们以 addFirst 办法为例,来看一下 handler 是怎么被退出 pipline 的:

    public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {checkMultiplicity(handler);
            name = filterName(name, handler);

            newCtx = newContext(group, name, handler);

            addFirst0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

它的工作逻辑是首先依据传入的 handler 构建一个新的 context,而后调用 addFirst0 办法,将 context 退出 AbstractChannelHandlerContext 组成的双向链表中:

    private void addFirst0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext nextCtx = head.next;
        newCtx.prev = head;
        newCtx.next = nextCtx;
        head.next = newCtx;
        nextCtx.prev = newCtx;
    }

而后调用 callHandlerAdded0 办法来触发 context 的 handlerAdded 办法。

总结

channelPipeline 负责管理 channel 的各种 handler,在 DefaultChannelPipeline 中应用了 AbstractChannelHandlerContext 的 head 和 tail 来对多个 handler 进行存储,同时借用这个链式构造对 handler 进行各种治理,十分不便。

本文已收录于 http://www.flydean.com/04-3-netty-channelpipeline/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」, 懂技术,更懂你!

退出移动版