乐趣区

聊聊storm client的netty buffer watermark


本文主要研究一下 storm client 的 netty buffer watermark
Config
storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.java
/**
* Netty based messaging: The netty write buffer high watermark in bytes.
* <p>
* If the number of bytes queued in the netty’s write buffer exceeds this value, the netty {@code Channel.isWritable()} will start to
* return {@code false}. The client will wait until the value falls below the {@linkplain #STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK
* low water mark}.
* </p>
*/
@isInteger
@isPositiveNumber
public static final String STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK = “storm.messaging.netty.buffer.high.watermark”;
/**
* Netty based messaging: The netty write buffer low watermark in bytes.
* <p>
* Once the number of bytes queued in the write buffer exceeded the {@linkplain #STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK high water
* mark} and then dropped down below this value, the netty {@code Channel.isWritable()} will start to return true.
* </p>
*/
@isInteger
@isPositiveNumber
public static final String STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK = “storm.messaging.netty.buffer.low.watermark”;

这里有两个相关的参数,分别是 storm.messaging.netty.buffer.high.watermark 以及 storm.messaging.netty.buffer.low.watermark
defaults.yaml

# The netty write buffer high watermark in bytes.
# If the number of bytes queued in the netty’s write buffer exceeds this value, the netty client will block
# until the value falls below the low water mark.
storm.messaging.netty.buffer.high.watermark: 16777216 # 16 MB
# The netty write buffer low watermark in bytes.
# Once the number of bytes queued in the write buffer exceeded the high water mark and then
# dropped down below this value, any blocked clients will unblock and start processing further messages.
storm.messaging.netty.buffer.low.watermark: 8388608 # 8 MB
在 defaults.yaml 文件中,low.watermark 默认大小为 8388608,即 8M;high.watermark 默认大小为 16777216,即 16M
Client
storm-2.0.0/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
Client(Map<String, Object> topoConf, AtomicBoolean[] remoteBpStatus,
EventLoopGroup eventLoopGroup, HashedWheelTimer scheduler, String host,
int port) {
this.topoConf = topoConf;
closing = false;
this.scheduler = scheduler;
int bufferSize = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
int lowWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK));
int highWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK));
// if SASL authentication is disabled, saslChannelReady is initialized as true; otherwise false
saslChannelReady.set(!ObjectReader.getBoolean(topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false));
LOG.info(“Creating Netty Client, connecting to {}:{}, bufferSize: {}, lowWatermark: {}, highWatermark: {}”,
host, port, bufferSize, lowWatermark, highWatermark);

int minWaitMs = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
int maxWaitMs = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, -1);

// Initiate connection to remote destination
this.eventLoopGroup = eventLoopGroup;
// Initiate connection to remote destination
bootstrap = new Bootstrap()
.group(this.eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_SNDBUF, bufferSize)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(lowWatermark, highWatermark))
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(new StormClientPipelineFactory(this, remoteBpStatus, topoConf));
dstAddress = new InetSocketAddress(host, port);
dstAddressPrefixedName = prefixedName(dstAddress);
launchChannelAliveThread();
scheduleConnect(NO_DELAY_MS);
int messageBatchSize = ObjectReader.getInt(topoConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
batcher = new MessageBuffer(messageBatchSize);
String clazz = (String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY);
if (clazz == null) {
waitStrategy = new WaitStrategyProgressive();
} else {
waitStrategy = ReflectionUtils.newInstance(clazz);
}
waitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
}
这里根据 lowWatermark 及 highWatermark 创建了 WriteBufferWaterMark 对象,设置到 ChannelOption.WRITE_BUFFER_WATER_MARK
WriteBufferWaterMark
netty-all-4.1.25.Final-sources.jar!/io/netty/channel/WriteBufferWaterMark.java
/**
* WriteBufferWaterMark is used to set low water mark and high water mark for the write buffer.
* <p>
* If the number of bytes queued in the write buffer exceeds the
* {@linkplain #high high water mark}, {@link Channel#isWritable()}
* will start to return {@code false}.
* <p>
* If the number of bytes queued in the write buffer exceeds the
* {@linkplain #high high water mark} and then
* dropped down below the {@linkplain #low low water mark},
* {@link Channel#isWritable()} will start to return
* {@code true} again.
*/
public final class WriteBufferWaterMark {

private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;

public static final WriteBufferWaterMark DEFAULT =
new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK, DEFAULT_HIGH_WATER_MARK, false);

private final int low;
private final int high;

/**
* Create a new instance.
*
* @param low low water mark for write buffer.
* @param high high water mark for write buffer
*/
public WriteBufferWaterMark(int low, int high) {
this(low, high, true);
}

/**
* This constructor is needed to keep backward-compatibility.
*/
WriteBufferWaterMark(int low, int high, boolean validate) {
if (validate) {
if (low < 0) {
throw new IllegalArgumentException(“write buffer’s low water mark must be >= 0”);
}
if (high < low) {
throw new IllegalArgumentException(
“write buffer’s high water mark cannot be less than ” +
” low water mark (” + low + “): ” +
high);
}
}
this.low = low;
this.high = high;
}

/**
* Returns the low water mark for the write buffer.
*/
public int low() {
return low;
}

/**
* Returns the high water mark for the write buffer.
*/
public int high() {
return high;
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder(55)
.append(“WriteBufferWaterMark(low: “)
.append(low)
.append(“, high: “)
.append(high)
.append(“)”);
return builder.toString();
}

}
从注释里头可以看到这两个参数控制的是 Channel.isWritable() 方法
ChannelOutboundBuffer.bytesBeforeWritable
netty-all-4.1.25.Final-sources.jar!/io/netty/channel/ChannelOutboundBuffer.java
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, “unwritable”);

private volatile int unwritable;

/**
* Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
* not exceed the write watermark of the {@link Channel} and
* no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
* {@code false}.
*/
public boolean isWritable() {
return unwritable == 0;
}

/**
* Get how many bytes must be drained from the underlying buffer until {@link #isWritable()} returns {@code true}.
* This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.
*/
public long bytesBeforeWritable() {
long bytes = totalPendingSize – channel.config().getWriteBufferLowWaterMark();
// If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
// Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
// together. totalPendingSize will be updated before isWritable().
if (bytes > 0) {
return isWritable() ? 0 : bytes;
}
return 0;
}

/**
* Decrement the pending bytes which will be written at some point.
* This method is thread-safe!
*/
void decrementPendingOutboundBytes(long size) {
decrementPendingOutboundBytes(size, true, true);
}

private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}

long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
setWritable(invokeLater);
}
}

private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}

private void fireChannelWritabilityChanged(boolean invokeLater) {
final ChannelPipeline pipeline = channel.pipeline();
if (invokeLater) {
Runnable task = fireChannelWritabilityChangedTask;
if (task == null) {
fireChannelWritabilityChangedTask = task = new Runnable() {
@Override
public void run() {
pipeline.fireChannelWritabilityChanged();
}
};
}
channel.eventLoop().execute(task);
} else {
pipeline.fireChannelWritabilityChanged();
}
}

bytesBeforeWritable 方法先判断 totalPendingSize 是否大于 lowWatermark,如果不大于则返回 0,如果大于且 isWritable 返回 true 则返回 0,否则返回差值
decrementPendingOutboundBytes 方法会判断,如果 notifyWritability 为 true 且 newWriteBufferSize < channel.config().getWriteBufferLowWaterMark(),则调用 setWritablesetWritable(invokeLater)
setWritable 会判断是否有变更,有的话,触发 fireChannelWritabilityChanged 进行通知

ChannelOutboundBuffer.bytesBeforeUnwritable
netty-all-4.1.25.Final-sources.jar!/io/netty/channel/ChannelOutboundBuffer.java
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, “unwritable”);

private volatile int unwritable;

/**
* Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
* not exceed the write watermark of the {@link Channel} and
* no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
* {@code false}.
*/
public boolean isWritable() {
return unwritable == 0;
}

/**
* Get how many bytes can be written until {@link #isWritable()} returns {@code false}.
* This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.
*/
public long bytesBeforeUnwritable() {
long bytes = channel.config().getWriteBufferHighWaterMark() – totalPendingSize;
// If bytes is negative we know we are not writable, but if bytes is non-negative we have to check writability.
// Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
// together. totalPendingSize will be updated before isWritable().
if (bytes > 0) {
return isWritable() ? bytes : 0;
}
return 0;
}

/**
* Increment the pending bytes which will be written at some point.
* This method is thread-safe!
*/
void incrementPendingOutboundBytes(long size) {
incrementPendingOutboundBytes(size, true);
}

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}

long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}

private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}

private void fireChannelWritabilityChanged(boolean invokeLater) {
final ChannelPipeline pipeline = channel.pipeline();
if (invokeLater) {
Runnable task = fireChannelWritabilityChangedTask;
if (task == null) {
fireChannelWritabilityChangedTask = task = new Runnable() {
@Override
public void run() {
pipeline.fireChannelWritabilityChanged();
}
};
}
channel.eventLoop().execute(task);
} else {
pipeline.fireChannelWritabilityChanged();
}
}

bytesBeforeUnwritable 方法先判断 highWatermark 与 totalPendingSize 的差值,totalPendingSize 大于等于 highWatermark,则返回 0;如果小于 highWatermark,且 isWritable 为 true,则返回差值,否则返回 0
incrementPendingOutboundBytes 方法判断如果 newWriteBufferSize > channel.config().getWriteBufferHighWaterMark(),则调用 setUnwritable(invokeLater)
setUnwritable 会判断是否有变更,有的话,触发 fireChannelWritabilityChanged 进行通知

小结

storm client 的 storm.messaging.netty.buffer.high.watermark(默认 16M) 以及 storm.messaging.netty.buffer.low.watermark(默认 8M) 其实配置的是 netty 的 ChannelOption.WRITE_BUFFER_WATER_MARK
netty 的 WriteBufferWaterMark 主要是控制 ChannelOutboundBuffer 的 bytesBeforeWritable 以及 bytesBeforeUnwritable 方法,通过 lowWatermark 及 highWatermark 参数来控制 ChannelOutboundBuffer 的 buffer 的容量
lowWatermark 及 highWatermark 分别在 decrementPendingOutboundBytes 及 incrementPendingOutboundBytes 方法里头用到,当小于 lowWatermark 或者大于 highWatermark 的时候,分别触发 setWritable 及 setUnwritable,更改 ChannelOutboundBuffer 的 unwritable 字段,进而影响 isWritable 方法;在 isWritable 为 true 的时候会立马执行写请求,当返回 false 的时候,写请求会被放入队列等待 isWritable 为 true 时才能执行这些堆积的写请求

doc

Pipelining and flow control
WriteBufferWaterMark
Netty 4: high and low write watermarks
Netty 水位详解
Netty 那些事儿 ——— Netty 实现“流量整形”原理分析及实战
Set sane WRITE_BUFFER_HIGH_WATER_MARK and WRITE_BUFFER_LOW_WATER_MARK

退出移动版