简介

咱们晓得proxy protocol是haproxy提出的一个代理协定,通过这个协定,所有实现这个协定的proxy或者LBS,都能够附带实在客户端的IP地址和端口号,这使得proxy protocol在理论利用中十分有用。

这么优良的协定,没有理由netty不反对。本文将谈判一下netty中对proxy protoco代理协定的反对。

netty对proxy protocol协定的反对

proxy protocol协定其实很简略,就是在申请后面带了proxy header信息。

在netty中这个header信息叫做HAProxyMessage:

public final class HAProxyMessage extends AbstractReferenceCounted {

HAProxyMessage是一个ReferenceCounted,这一点和ByteBuf很相似,阐明HAProxyMessage保留着和ByteBuf很相似的个性。

依据proxy protocol协定,该协定能够分为两个版本,别离是v1和v2,其中v1版本是文本协定,而v2版本反对二进制的格局。

显然从代码编写和调试的角度来看v1更加敌对,然而从程序的角度来看,v2可能性能更高。

HAProxyMessage中有个专门的HAProxyProtocolVersion类,来示意proxy protocol的版本信息:

public enum HAProxyProtocolVersion {    V1(VERSION_ONE_BYTE),    V2(VERSION_TWO_BYTE);

HAProxyProtocolVersion是一个枚举类,在它外面定义了和proxy协定绝对应的两个版本号。

在版本号之后是command,在netty中用HAProxyCommand来示意:

public enum HAProxyCommand {    LOCAL(HAProxyConstants.COMMAND_LOCAL_BYTE),    PROXY(HAProxyConstants.COMMAND_PROXY_BYTE);

HAProxyCommand也是一个枚举类,外面定义了两个command的值,别离是local和proxy。

其中local示意该申请是代理服务器被动发动的,而不是客户端发动的,比方监控检测等申请。

proxy示意该申请是一个代理申请。

接下来是AddressFamily和TransportProtocol,这两个字段用同一个byte来示意,所以这两个类都是HAProxyProxiedProtocol的外部类。

先看下AddressFamily的定义:

    public enum AddressFamily {        AF_UNSPEC(AF_UNSPEC_BYTE),        AF_IPv4(AF_IPV4_BYTE),        AF_IPv6(AF_IPV6_BYTE),        AF_UNIX(AF_UNIX_BYTE);

AddressFamily中定义了4个address family类型,别离是unspec,ipv4,ipv6和unix。别离对应未知family,ipv4,ipv6和unix domain socket。

再看下TransportProtocol的定义:

    public enum TransportProtocol {        UNSPEC(TRANSPORT_UNSPEC_BYTE),        STREAM(TRANSPORT_STREAM_BYTE),        DGRAM(TRANSPORT_DGRAM_BYTE);

TransportProtocol有3个值,别离是unspec,stream和dgram。别离对应未知协定,http/https协定,udp/tcp协定。

因为AddressFamily和TransportProtocol实际上是同一个byte,所以通过组合之后能够失去上面的几个枚举值:

    UNKNOWN(TPAF_UNKNOWN_BYTE, AddressFamily.AF_UNSPEC, TransportProtocol.UNSPEC),    TCP4(TPAF_TCP4_BYTE, AddressFamily.AF_IPv4, TransportProtocol.STREAM),    TCP6(TPAF_TCP6_BYTE, AddressFamily.AF_IPv6, TransportProtocol.STREAM),    UDP4(TPAF_UDP4_BYTE, AddressFamily.AF_IPv4, TransportProtocol.DGRAM),    UDP6(TPAF_UDP6_BYTE, AddressFamily.AF_IPv6, TransportProtocol.DGRAM),    UNIX_STREAM(TPAF_UNIX_STREAM_BYTE, AddressFamily.AF_UNIX, TransportProtocol.STREAM),    UNIX_DGRAM(TPAF_UNIX_DGRAM_BYTE, AddressFamily.AF_UNIX, TransportProtocol.DGRAM);

以上的枚举值也是HAProxyProxiedProtocol中定义的值。

接下就是源ip地址,指标地ip地址,源端口和指标端口这几个值,定义为属性示意如下:

    private final String sourceAddress;    private final String destinationAddress;    private final int sourcePort;    private final int destinationPort;

最初,proxy protocol中还能够蕴含额定的字段tlv,tlv在netty中也是一种byteBuf,应用HAProxyTLV示意:

public class HAProxyTLV extends DefaultByteBufHolder 

因为tlv是key value构造,所以看下HAProxyTLV的构造函数:

    public HAProxyTLV(Type type, ByteBuf content) {        this(type, Type.byteValueForType(type), content);    }

HAProxyTLV承受一个type和byteBuf的value。

Type是一个枚举类,在netty中能够反对上面的值:

    public enum Type {        PP2_TYPE_ALPN,        PP2_TYPE_AUTHORITY,        PP2_TYPE_SSL,        PP2_TYPE_SSL_VERSION,        PP2_TYPE_SSL_CN,        PP2_TYPE_NETNS,        OTHER;

在HAProxyMessage中,tlv是一个list来保留的:

private final List<HAProxyTLV> tlvs;

到此,所有HAProxyMessage所须要的参数都齐了,咱们看下HAProxyMessage的构造函数:

    public HAProxyMessage(            HAProxyProtocolVersion protocolVersion, HAProxyCommand command, HAProxyProxiedProtocol proxiedProtocol,            String sourceAddress, String destinationAddress, int sourcePort, int destinationPort,            List<? extends HAProxyTLV> tlvs)

HAProxyMessage会将所有的参数都存储到本地的变量中,供后续应用。

因为proxy protocol有两个版本,v1和v2,所以HAProxyMessage中提供了两个将header编码为AProxyMessage对象的办法,别离是:

static HAProxyMessage decodeHeader(ByteBuf header) 

和:

static HAProxyMessage decodeHeader(String header)

有了proxy protocol的java示意之后,咱们再来看一下HAProxyMessage的编码解码器。

HAProxyMessage的编码解码器

netty对HAProxyMessage对象的反对体现在两个中央,netty提供了两个类别离对HAProxyMessage进行编码和解码,这两个类是HAProxyMessageEncoder和HAProxyMessageDecoder。

先看一下HAProxyMessageEncoder:

public final class HAProxyMessageEncoder extends MessageToByteEncoder<HAProxyMessage> 

HAProxyMessageEncoder继承自MessageToByteEncoder,传入的泛型是HAProxyMessage,示意是将HAProxyMessage编码成为ByteBuf。

它的encode办法很简略,依据HAProxyMessage传入的message版本信息,别离进行编码:

    protected void encode(ChannelHandlerContext ctx, HAProxyMessage msg, ByteBuf out) throws Exception {        switch (msg.protocolVersion()) {            case V1:                encodeV1(msg, out);                break;            case V2:                encodeV2(msg, out);                break;            default:                throw new HAProxyProtocolException("Unsupported version: " + msg.protocolVersion());        }    }

HAProxyMessageDecoder是跟HAProxyMessageEncoder相同的动作,是将接管到的ByteBuf解析成为HAProxyMessage:

public class HAProxyMessageDecoder extends ByteToMessageDecoder 

因为HAProxyMessage有两个版本,那么怎么判断接管到的ByeBuf是哪个版本呢?

其实很简略,因为v1版本和v2版本的开始字符是不一样的,v1版本的结尾是一个text:"PROXY", v2版本的结尾是一个固定的二进制串,如下所示:

    static final byte[] BINARY_PREFIX = {            (byte) 0x0D,            (byte) 0x0A,            (byte) 0x0D,            (byte) 0x0A,            (byte) 0x00,            (byte) 0x0D,            (byte) 0x0A,            (byte) 0x51,            (byte) 0x55,            (byte) 0x49,            (byte) 0x54,            (byte) 0x0A    };    static final byte[] TEXT_PREFIX = {            (byte) 'P',            (byte) 'R',            (byte) 'O',            (byte) 'X',            (byte) 'Y',    };

看下它的decode办法实现:

    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {        if (version == -1) {            if ((version = findVersion(in)) == -1) {                return;            }        }        ByteBuf decoded;        if (version == 1) {            decoded = decodeLine(ctx, in);        } else {            decoded = decodeStruct(ctx, in);        }        if (decoded != null) {            finished = true;            try {                if (version == 1) {                    out.add(HAProxyMessage.decodeHeader(decoded.toString(CharsetUtil.US_ASCII)));                } else {                    out.add(HAProxyMessage.decodeHeader(decoded));                }            } catch (HAProxyProtocolException e) {                fail(ctx, null, e);            }        }    }

下面代码的逻辑是先从ByteBuf中依据版本号decode出header信息放到ByteBuf中。

而后再依据版本号的不同,别离调用HAProxyMessage的两个不同版本的decodeHeader办法进行解码。最终失去HAProxyMessage。

netty中proxy protocol的代码示例

有了netty对proxy protocol的反对,那么在netty中搭建反对proxy protocol的服务器和客户端就很容易了。

先看一下如何搭建反对proxy protocol的服务器:

    private static void startServer(int port) throws InterruptedException {        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)             .channel(NioServerSocketChannel.class)             .handler(new LoggingHandler(LogLevel.INFO))             .childHandler(new ServerInitializer());            b.bind(port).sync().channel().closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }

代码和惯例的netty server一样,这里应用了NioEventLoopGroup和NioServerSocketChannel,搭建了一个反对TCP协定的netty服务器。

ServerInitializer中蕴含了netty自带的HAProxy编码器和自定义的音讯处理器:

class ServerInitializer extends ChannelInitializer<SocketChannel> {    @Override    public void initChannel(SocketChannel ch) throws Exception {        ch.pipeline().addLast(                new LoggingHandler(LogLevel.DEBUG),                new HAProxyMessageDecoder(),                new SimpleChannelInboundHandler() {                    @Override                    protected void channelRead0(ChannelHandlerContext ctx, Object msg) {                        if (msg instanceof HAProxyMessage) {                            log.info("proxy message is : {}", msg);                        } else if (msg instanceof ByteBuf) {                            log.info("bytebuf message is : {}", ByteBufUtil.prettyHexDump((ByteBuf) msg));                        }                    }                });    }}

这里应用netty自带的HAProxyMessageDecoder,用来将ByteBuf音讯解码为HAProxyMessage,而后在自定义的SimpleChannelInboundHandler中对HAProxyMessage进行解决。

这里的服务器能够解决两种音讯,一种是HAProxyMessage,一种是原始的ByteBuf。解决的后果就是将音讯打印进去。

而后看下客户端的定义:

EventLoopGroup group = new NioEventLoopGroup();            Bootstrap b = new Bootstrap();            b.group(group)                    .channel(NioSocketChannel.class)                    .handler(new ClientHander());            Channel ch = b.connect(host, port).sync().channel();

客户端应用的是EventLoopGroup和NioSocketChannel,是基于TCP协定的申请。

这里增加了自定义的handler:ClientHander,ClientHander继承自ChannelOutboundHandlerAdapter用来对client收回的音讯进行解决。

这里看一下它的handlerAdded办法:

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {        ctx.pipeline().addBefore(ctx.name(), null, HAProxyMessageEncoder.INSTANCE);        super.handlerAdded(ctx);    }

能够看到handlerAdded办法向channelPipeline中增加了HAProxyMessageEncoder,用于编码HAProxyMessage。

因为对于一个connection来说,HAProxyMessage只须要用到一次,后续的失常音讯就不须要这个编码器了,所以咱们须要在write办法中监听HAProxyMessage的状态,如果写入胜利之后,就从pipeline中移出HAProxyMessageEncoder和ClientHander。

    public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {        ChannelFuture future1 = ctx.write(msg, promise);        if (msg instanceof HAProxyMessage) {            future1.addListener((ChannelFutureListener) future2 -> {                if (future2.isSuccess()) {                    ctx.pipeline().remove(HAProxyMessageEncoder.INSTANCE);                    ctx.pipeline().remove(ClientHander.this);                } else {                    ctx.close();                }            });        }    }

最初咱们构建了一个虚构的HAProxyMessage,而后通过netty客户端进行发送:

HAProxyMessage message = new HAProxyMessage(                    HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, HAProxyProxiedProtocol.TCP4,                    "127.0.0.1", "127.0.0.2", 8000, 9000);            ch.writeAndFlush(message).sync();            ch.writeAndFlush(Unpooled.copiedBuffer("this is a proxy protocol message!", CharsetUtil.UTF_8)).sync();            ch.close().sync();

总结

下面的代码只是一个简略的模仿proxy protocol在netty中的应用状况,并不代表下面的代码就能够在理论的我的项目中利用了。如果你想应用的话,能够在上面的代码下面持续丰盛和欠缺。

本文的代码,大家能够参考:

learn-netty4