关于java:深入理解netty从偶现宕机看netty流量控制

51次阅读

共计 4955 个字符,预计需要花费 13 分钟才能阅读完成。

一、业务背景

目前挪动端的应用场景中会用到大量的音讯推送,push 音讯能够帮忙经营人员更高效地实现经营指标(比方给用户推送营销流动或者揭示 APP 新性能)。

对于推送零碎来说须要具备以下两个个性:

  • 音讯秒级送到用户,无延时,反对每秒百万推送,单机百万长连贯。
  • 反对告诉、文本、自定义音讯透传等展示模式。正是因为以上起因,对于零碎的开发和保护带来了挑战。下图是推送零碎的简略形容(API-> 推送模块 -> 手机)。

二、问题背景

推送零碎中长连贯集群在稳定性测试、压力测试阶运行一段时间后随机会呈现一个过程挂掉的状况,概率较小(频率为一个月左右产生一次),这会影响局部客户端音讯送到的时效。

推送零碎中的长连贯节点(Broker 零碎)是基于 Netty 开发,此节点保护了服务端和手机终端的长连贯,线上问题呈现后,增加 Netty 内存泄露监控参数进行问题排查,察看多天但并未排查出问题。

因为长连贯节点是 Netty 开发,为便于读者了解,上面简略介绍一下 Netty。

三、Netty 介绍

Netty 是一个高性能、异步事件驱动的 NIO 框架,基于 Java NIO 提供的 API 实现。它提供了对 TCP、UDP 和文件传输的反对,作为以后最风行的 NIO 框架,Netty 在互联网畛域、大数据分布式计算畛域、游戏行业、通信行业等取得了宽泛的利用,HBase,Hadoop,Bees,Dubbo 等开源组件也基于 Netty 的 NIO 框架构建。

四、问题剖析

4.1 猜测

最后猜测是长连接数导致的,但通过排查日志、剖析代码,发现并不是此起因造成。

长连接数:39 万,如下图:

每个 channel 字节大小 1456, 按 40 万长连贯计算,不致于产生内存过大景象。

4.2 查看 GC 日志

查看 GC 日志,发现过程挂掉之前频繁 full GC(频率 5 分钟一次),但内存并未升高,狐疑堆外内存泄露。

4.3 剖析 heap 内存状况

ChannelOutboundBuffer 对象占将近 5G 内存,泄露起因根本能够确定:ChannelOutboundBuffer 的 entry 数过多导致,查看 ChannelOutboundBuffer 的源码能够剖析出,是 ChannelOutboundBuffer 中的数据。

没有写出去,导致始终积压;ChannelOutboundBuffer 外部是一个链表构造。

4.4 从上图剖析数据未写出去,为什么会呈现这种状况?

代码中理论有判断连贯是否可用的状况(Channel.isActive),并且会对超时的连贯进行敞开。从历史教训来看,这种状况产生在连贯半关上(客户端异样敞开)的状况比拟多 — 单方不进行数据通信无问题。

按上述猜测,测试环境进行重现和测试。

1)模仿客户端集群,并与长连贯服务器建设连贯,设置客户端节点的防火墙,模仿服务器与客户端网络异样的场景(即要模仿 Channel.isActive 调用胜利,但数据理论发送不进来的状况)。

2)调小堆外内存,继续发送测试音讯给之前的客户端。音讯大小(1K 左右)。

3)依照 128M 内存来计算,实际上调用 9W 屡次就会呈现。

五、问题解决

5.1 启用 autoRead 机制

当 channel 不可写时,敞开 autoRead;

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {if (!ctx.channel().isWritable()) {Channel channel = ctx.channel();
        ChannelInfo channelInfo = ChannelManager.CHANNEL_CHANNELINFO.get(channel);
        String clientId = "";
        if (channelInfo != null) {clientId = channelInfo.getClientId();
        }

        LOGGER.info("channel is unwritable, turn off autoread, clientId:{}", clientId);
        channel.config().setAutoRead(false);
    }
}

当数据可写时开启 autoRead;

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception
{Channel channel = ctx.channel();
    ChannelInfo channelInfo = ChannelManager.CHANNEL_CHANNELINFO.get(channel);
    String clientId = "";
    if (channelInfo != null) {clientId = channelInfo.getClientId();
    }
    if (channel.isWritable()) {LOGGER.info("channel is writable again, turn on autoread, clientId:{}", clientId);
        channel.config().setAutoRead(true);
    }
}

阐明:

autoRead 的作用是更准确的速率管制,如果关上的时候 Netty 就会帮咱们注册读事件。当注册了读事件后,如果网络可读,则 Netty 就会从 channel 读取数据。那如果 autoread 关掉后,则 Netty 会不注册读事件。

这样即便是对端发送数据过去了也不会触发读事件,从而也不会从 channel 读取到数据。当 recv_buffer 满时,也就不会再接收数据。

5.2 设置高下水位

serverBootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 8 * 1024 * 1024));

注:高下水位配合前面的 isWritable 应用

5.3 减少 channel.isWritable() 的判断

channel 是否可用除了校验 channel.isActive() 还须要加上 channel.isWrite() 的判断,isActive 只是保障连贯是否激活,而是否可写由 isWrite 来决定。

private void writeBackMessage(ChannelHandlerContext ctx, MqttMessage message) {Channel channel = ctx.channel();
    // 减少 channel.isWritable() 的判断
    if (channel.isActive() && channel.isWritable()) {ChannelFuture cf = channel.writeAndFlush(message);
        if (cf.isDone() && cf.cause() != null) {LOGGER.error("channelWrite error!", cf.cause());
            ctx.close();}
    }
}

注:isWritable 能够来管制 ChannelOutboundBuffer,不让其无限度收缩。其机制就是利用设置好的 channel 高下水位来进行判断。

5.4 问题验证

批改后再进行测试,发送到 27W 次也并不报错;

六、解决思路剖析

个别 Netty 数据处理流程如下:将读取的数据交由业务线程解决,解决实现再发送进来(整个过程是异步的),Netty 为了进步网络的吞吐量,在业务层与 socket 之间减少了一个 ChannelOutboundBuffer。

在调用 channel.write 的时候,所有写出的数据其实并没有写到 socket,而是先写到 ChannelOutboundBuffer。当调用 channel.flush 的时候才真正的向 socket 写出。因为这两头有一个 buffer,就存在速率匹配了,而且这个 buffer 还是无界的(链表),也就是你如果没有管制 channel.write 的速度,会有大量的数据在这个 buffer 里沉积,如果又碰到 socket 写不出数据的时候(isActive 此时判断有效)或者写得慢的状况。

很有可能的后果就是资源耗尽,而且如果 ChannelOutboundBuffer 寄存的是 DirectByteBuffer,这会让问题更加难排查。

流程可形象如下:

从下面的剖析能够看出,步骤一写太快(快到解决不过去)或者上游发送不出数据都会造成问题,这理论是一个速率匹配问题。

七、Netty 源码阐明

超过高水位

当 ChannelOutboundBuffer 的容量超过高水位设定阈值后,isWritable() 返回 false,设置 channel 不可写(setUnwritable),并且触发 fireChannelWritabilityChanged()。

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {if (size == 0) {return;}

    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {setUnwritable(invokeLater);
    }
}
private void setUnwritable(boolean invokeLater) {for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue | 1;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {if (oldValue == 0 && newValue != 0) {fireChannelWritabilityChanged(invokeLater);
            }
            break;
        }
    }
}

低于低水位

当 ChannelOutboundBuffer 的容量低于低水位设定阈值后,isWritable() 返回 true,设置 channel 可写,并且触发 fireChannelWritabilityChanged()。

private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {if (size == 0) {return;}

    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
    if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {setWritable(invokeLater);
    }
}
private void setWritable(boolean invokeLater) {for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue & ~1;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {if (oldValue != 0 && newValue == 0) {fireChannelWritabilityChanged(invokeLater);
            }
            break;
        }
    }
}

八、总结

当 ChannelOutboundBuffer 的容量超过高水位设定阈值后,isWritable() 返回 false,表明音讯产生沉积,须要升高写入速度。

当 ChannelOutboundBuffer 的容量低于低水位设定阈值后,isWritable() 返回 true,表明音讯过少,须要进步写入速度。通过以上三个步骤批改后,部署线上察看半年未产生问题呈现。

​作者:vivo 互联网服务器团队 -Zhang Lin

正文完
 0