乐趣区

关于netty:netty系列之在netty中使用proxy-protocol

简介

咱们晓得 proxy protocol 是 haproxy 提出的一个代理协定,通过这个协定, 所有实现这个协定的 proxy 或者 LBS, 都能够附带实在客户端的 IP 地址和端口号,这使得 proxy protocol 在理论利用中十分有用。

这么优良的协定,没有理由 netty 不反对。本文将谈判一下 netty 中对 proxy protoco 代理协定的反对。

netty 对 proxy protocol 协定的反对

proxy protocol 协定其实很简略,就是在申请后面带了 proxy header 信息。

在 netty 中这个 header 信息叫做 HAProxyMessage:

public final class HAProxyMessage extends AbstractReferenceCounted {

HAProxyMessage 是一个 ReferenceCounted,这一点和 ByteBuf 很相似,阐明 HAProxyMessage 保留着和 ByteBuf 很相似的个性。

依据 proxy protocol 协定,该协定能够分为两个版本,别离是 v1 和 v2, 其中 v1 版本是文本协定,而 v2 版本反对二进制的格局。

显然从代码编写和调试的角度来看 v1 更加敌对,然而从程序的角度来看,v2 可能性能更高。

HAProxyMessage 中有个专门的 HAProxyProtocolVersion 类,来示意 proxy protocol 的版本信息:

public enum HAProxyProtocolVersion {V1(VERSION_ONE_BYTE),

    V2(VERSION_TWO_BYTE);

HAProxyProtocolVersion 是一个枚举类,在它外面定义了和 proxy 协定绝对应的两个版本号。

在版本号之后是 command,在 netty 中用 HAProxyCommand 来示意:

public enum HAProxyCommand {LOCAL(HAProxyConstants.COMMAND_LOCAL_BYTE),

    PROXY(HAProxyConstants.COMMAND_PROXY_BYTE);

HAProxyCommand 也是一个枚举类,外面定义了两个 command 的值,别离是 local 和 proxy。

其中 local 示意该申请是代理服务器被动发动的,而不是客户端发动的,比方监控检测等申请。

proxy 示意该申请是一个代理申请。

接下来是 AddressFamily 和 TransportProtocol, 这两个字段用同一个 byte 来示意,所以这两个类都是 HAProxyProxiedProtocol 的外部类。

先看下 AddressFamily 的定义:

    public enum AddressFamily {AF_UNSPEC(AF_UNSPEC_BYTE),

        AF_IPv4(AF_IPV4_BYTE),

        AF_IPv6(AF_IPV6_BYTE),

        AF_UNIX(AF_UNIX_BYTE);

AddressFamily 中定义了 4 个 address family 类型,别离是 unspec,ipv4,ipv6 和 unix。别离对应未知 family,ipv4,ipv6 和 unix domain socket。

再看下 TransportProtocol 的定义:

    public enum TransportProtocol {UNSPEC(TRANSPORT_UNSPEC_BYTE),

        STREAM(TRANSPORT_STREAM_BYTE),

        DGRAM(TRANSPORT_DGRAM_BYTE);

TransportProtocol 有 3 个值,别离是 unspec,stream 和 dgram。别离对应未知协定,http/https 协定,udp/tcp 协定。

因为 AddressFamily 和 TransportProtocol 实际上是同一个 byte,所以通过组合之后能够失去上面的几个枚举值:

    UNKNOWN(TPAF_UNKNOWN_BYTE, AddressFamily.AF_UNSPEC, TransportProtocol.UNSPEC),

    TCP4(TPAF_TCP4_BYTE, AddressFamily.AF_IPv4, TransportProtocol.STREAM),

    TCP6(TPAF_TCP6_BYTE, AddressFamily.AF_IPv6, TransportProtocol.STREAM),

    UDP4(TPAF_UDP4_BYTE, AddressFamily.AF_IPv4, TransportProtocol.DGRAM),

    UDP6(TPAF_UDP6_BYTE, AddressFamily.AF_IPv6, TransportProtocol.DGRAM),

    UNIX_STREAM(TPAF_UNIX_STREAM_BYTE, AddressFamily.AF_UNIX, TransportProtocol.STREAM),

    UNIX_DGRAM(TPAF_UNIX_DGRAM_BYTE, AddressFamily.AF_UNIX, TransportProtocol.DGRAM);

以上的枚举值也是 HAProxyProxiedProtocol 中定义的值。

接下就是源 ip 地址,指标地 ip 地址,源端口和指标端口这几个值,定义为属性示意如下:

    private final String sourceAddress;
    private final String destinationAddress;
    private final int sourcePort;
    private final int destinationPort;

最初,proxy protocol 中还能够蕴含额定的字段 tlv,tlv 在 netty 中也是一种 byteBuf,应用 HAProxyTLV 示意:

public class HAProxyTLV extends DefaultByteBufHolder 

因为 tlv 是 key value 构造,所以看下 HAProxyTLV 的构造函数:

    public HAProxyTLV(Type type, ByteBuf content) {this(type, Type.byteValueForType(type), content);
    }

HAProxyTLV 承受一个 type 和 byteBuf 的 value。

Type 是一个枚举类,在 netty 中能够反对上面的值:

    public enum Type {
        PP2_TYPE_ALPN,
        PP2_TYPE_AUTHORITY,
        PP2_TYPE_SSL,
        PP2_TYPE_SSL_VERSION,
        PP2_TYPE_SSL_CN,
        PP2_TYPE_NETNS,
        OTHER;

在 HAProxyMessage 中,tlv 是一个 list 来保留的:

private final List<HAProxyTLV> tlvs;

到此,所有 HAProxyMessage 所须要的参数都齐了,咱们看下 HAProxyMessage 的构造函数:

    public HAProxyMessage(
            HAProxyProtocolVersion protocolVersion, HAProxyCommand command, HAProxyProxiedProtocol proxiedProtocol,
            String sourceAddress, String destinationAddress, int sourcePort, int destinationPort,
            List<? extends HAProxyTLV> tlvs)

HAProxyMessage 会将所有的参数都存储到本地的变量中,供后续应用。

因为 proxy protocol 有两个版本,v1 和 v2,所以 HAProxyMessage 中提供了两个将 header 编码为 AProxyMessage 对象的办法,别离是:

static HAProxyMessage decodeHeader(ByteBuf header) 

和:

static HAProxyMessage decodeHeader(String header)

有了 proxy protocol 的 java 示意之后,咱们再来看一下 HAProxyMessage 的编码解码器。

HAProxyMessage 的编码解码器

netty 对 HAProxyMessage 对象的反对体现在两个中央,netty 提供了两个类别离对 HAProxyMessage 进行编码和解码,这两个类是 HAProxyMessageEncoder 和 HAProxyMessageDecoder。

先看一下 HAProxyMessageEncoder:

public final class HAProxyMessageEncoder extends MessageToByteEncoder<HAProxyMessage> 

HAProxyMessageEncoder 继承自 MessageToByteEncoder,传入的泛型是 HAProxyMessage,示意是将 HAProxyMessage 编码成为 ByteBuf。

它的 encode 办法很简略,依据 HAProxyMessage 传入的 message 版本信息,别离进行编码:

    protected void encode(ChannelHandlerContext ctx, HAProxyMessage msg, ByteBuf out) throws Exception {switch (msg.protocolVersion()) {
            case V1:
                encodeV1(msg, out);
                break;
            case V2:
                encodeV2(msg, out);
                break;
            default:
                throw new HAProxyProtocolException("Unsupported version:" + msg.protocolVersion());
        }
    }

HAProxyMessageDecoder 是跟 HAProxyMessageEncoder 相同的动作,是将接管到的 ByteBuf 解析成为 HAProxyMessage:

public class HAProxyMessageDecoder extends ByteToMessageDecoder 

因为 HAProxyMessage 有两个版本,那么怎么判断接管到的 ByeBuf 是哪个版本呢?

其实很简略,因为 v1 版本和 v2 版本的开始字符是不一样的,v1 版本的结尾是一个 text:”PROXY”, v2 版本的结尾是一个固定的二进制串, 如下所示:

    static final byte[] BINARY_PREFIX = {(byte) 0x0D,
            (byte) 0x0A,
            (byte) 0x0D,
            (byte) 0x0A,
            (byte) 0x00,
            (byte) 0x0D,
            (byte) 0x0A,
            (byte) 0x51,
            (byte) 0x55,
            (byte) 0x49,
            (byte) 0x54,
            (byte) 0x0A
    };

    static final byte[] TEXT_PREFIX = {(byte) 'P',
            (byte) 'R',
            (byte) 'O',
            (byte) 'X',
            (byte) 'Y',
    };

看下它的 decode 办法实现:

    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (version == -1) {if ((version = findVersion(in)) == -1) {return;}
        }

        ByteBuf decoded;

        if (version == 1) {decoded = decodeLine(ctx, in);
        } else {decoded = decodeStruct(ctx, in);
        }

        if (decoded != null) {
            finished = true;
            try {if (version == 1) {out.add(HAProxyMessage.decodeHeader(decoded.toString(CharsetUtil.US_ASCII)));
                } else {out.add(HAProxyMessage.decodeHeader(decoded));
                }
            } catch (HAProxyProtocolException e) {fail(ctx, null, e);
            }
        }
    }

下面代码的逻辑是先从 ByteBuf 中依据版本号 decode 出 header 信息放到 ByteBuf 中。

而后再依据版本号的不同,别离调用 HAProxyMessage 的两个不同版本的 decodeHeader 办法进行解码。最终失去 HAProxyMessage。

netty 中 proxy protocol 的代码示例

有了 netty 对 proxy protocol 的反对,那么在 netty 中搭建反对 proxy protocol 的服务器和客户端就很容易了。

先看一下如何搭建反对 proxy protocol 的服务器:

    private static void startServer(int port) throws InterruptedException {EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ServerInitializer());
            b.bind(port).sync().channel().closeFuture().sync();
        } finally {bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();}
    }

代码和惯例的 netty server 一样,这里应用了 NioEventLoopGroup 和 NioServerSocketChannel, 搭建了一个反对 TCP 协定的 netty 服务器。

ServerInitializer 中蕴含了 netty 自带的 HAProxy 编码器和自定义的音讯处理器:

class ServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG),
                new HAProxyMessageDecoder(),
                new SimpleChannelInboundHandler() {
                    @Override
                    protected void channelRead0(ChannelHandlerContext ctx, Object msg) {if (msg instanceof HAProxyMessage) {log.info("proxy message is : {}", msg);
                        } else if (msg instanceof ByteBuf) {log.info("bytebuf message is : {}", ByteBufUtil.prettyHexDump((ByteBuf) msg));
                        }
                    }
                });
    }
}

这里应用 netty 自带的 HAProxyMessageDecoder,用来将 ByteBuf 音讯解码为 HAProxyMessage,而后在自定义的 SimpleChannelInboundHandler 中对 HAProxyMessage 进行解决。

这里的服务器能够解决两种音讯,一种是 HAProxyMessage,一种是原始的 ByteBuf。解决的后果就是将音讯打印进去。

而后看下客户端的定义:

EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ClientHander());
            Channel ch = b.connect(host, port).sync().channel();

客户端应用的是 EventLoopGroup 和 NioSocketChannel,是基于 TCP 协定的申请。

这里增加了自定义的 handler:ClientHander,ClientHander 继承自 ChannelOutboundHandlerAdapter 用来对 client 收回的音讯进行解决。

这里看一下它的 handlerAdded 办法:

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {ctx.pipeline().addBefore(ctx.name(), null, HAProxyMessageEncoder.INSTANCE);
        super.handlerAdded(ctx);
    }

能够看到 handlerAdded 办法向 channelPipeline 中增加了 HAProxyMessageEncoder,用于编码 HAProxyMessage。

因为对于一个 connection 来说,HAProxyMessage 只须要用到一次,后续的失常音讯就不须要这个编码器了,所以咱们须要在 write 办法中监听 HAProxyMessage 的状态,如果写入胜利之后,就从 pipeline 中移出 HAProxyMessageEncoder 和 ClientHander。

    public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {ChannelFuture future1 = ctx.write(msg, promise);
        if (msg instanceof HAProxyMessage) {future1.addListener((ChannelFutureListener) future2 -> {if (future2.isSuccess()) {ctx.pipeline().remove(HAProxyMessageEncoder.INSTANCE);
                    ctx.pipeline().remove(ClientHander.this);
                } else {ctx.close();
                }
            });
        }
    }

最初咱们构建了一个虚构的 HAProxyMessage,而后通过 netty 客户端进行发送:

HAProxyMessage message = new HAProxyMessage(
                    HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, HAProxyProxiedProtocol.TCP4,
                    "127.0.0.1", "127.0.0.2", 8000, 9000);
            ch.writeAndFlush(message).sync();
            ch.writeAndFlush(Unpooled.copiedBuffer("this is a proxy protocol message!", CharsetUtil.UTF_8)).sync();
            ch.close().sync();

总结

下面的代码只是一个简略的模仿 proxy protocol 在 netty 中的应用状况,并不代表下面的代码就能够在理论的我的项目中利用了。如果你想应用的话,能够在上面的代码下面持续丰盛和欠缺。

本文的代码,大家能够参考:

learn-netty4

退出移动版