本文作者 优测性能测试专家高源。
简介:本文以最新的JMeter 5.5版本源代码为例具体介绍了单机模式和分布式模式下后果收集器的工作原理。通篇干货,还不快来理解一下!

一、JMeter后果收集器概述
JMeter是在压力畛域中最常见的性能测试工具,因为其开源的特点,受到宽广测试和开发同学的青眼。然而,在理论利用过程中,JMeter存在的一些性能瓶颈也凸显进去,常常会遇到大并发下压不下来的状况。笔者通过深入分析其源码实现,找到JMeter存在的瓶颈问题及根本原因,为当前更好地应用工具提供一些思路。
后果收集器:在JMeter中负责报告数据收集的重任,无论是单机模式还是master-slave模式,每一个申请的后果都是通过相应的后果收集器进行数据采集的。在单机模式下用Result Collector这个监听器去采集,在分布式(master-slave)场景下通过配RemoteSampleListenerWrapper下的指定sender进行收集,具体配置jmeter.property文件的mode属性和队列长度实现。上面咱们以以后最新的JMeter 5.5版本的源代码为例具体介绍下单机模式和分布式模式下后果收集器的工作原理。
二、单机模式
1、初始化
在命令行模式下,JMeter会依据用户的logfile配置抉择是否增加Result Collector,个别在理论测试的时候,咱们都是须要有具体统计报告生成的,所以都会增加Result Collector,收集器放在了整个hashtree的第一个节点,代码如下:
void runNonGui(String testFile, String logFile, boolean remoteStart, String remoteHostsString, boolean generateReportDashboard){
....
ResultCollector resultCollector = null;
if (logFile != null) {

 resultCollector = new ResultCollector(summariser); resultCollector.setFilename(logFile); clonedTree.add(clonedTree.getArray()[0], resultCollector); }

else {

 // only add Summariser if it can not be shared with the ResultCollector

if (summariser != null) {

  clonedTree.add(clonedTree.getArray()[0], summariser);  }  }

....

}
2、加载流程
增加完后果收集器后,执行脚本过程中,JMeter会依据jmx的编排,依照如下的执行程序进行调用:

每一个线程都是依照以上的程序循环反复执行,直到压测进行。具体代码如下(相应的关键点已减少正文):
private void executeSamplePackage(Sampler current,

  TransactionSampler transactionSampler,  SamplePackage transactionPack,  JMeterContext threadContext) {

threadContext.setCurrentSampler(current);
// Get the sampler ready to sample
SamplePackage pack = compiler.configureSampler(current);
runPreProcessors(pack.getPreProcessors());//运行前置处理器
// Hack: save the package for any transaction controllers
threadVars.putObject(PACKAGE_OBJECT, pack);
delay(pack.getTimers());//定时器timer
SampleResult result = null;
if (running) {

   Sampler sampler = pack.getSampler();   result = doSampling(threadContext, sampler);

}
// If we got any results, then perform processing on the result
if (result != null) {
if (!result.isIgnore()) {

      ...                

runPostProcessors(pack.getPostProcessors());//运行后置处理器
checkAssertions(pack.getAssertions(), result, threadContext);//运行断言处理器

        // PostProcessors can call setIgnore, so reevaluate here        if (!result.isIgnore()) {        // Do not send subsamples to listeners which receive the transaction sample        List<SampleListener> sampleListeners = getSampleListeners(pack, transactionPack, transactionSampler);        notifyListeners(sampleListeners, result);//执行监听器,此处为执行报告收集器的sampleOccurred办法        }        compiler.done(pack);        ...}

收集器Result Collector执行的具体代码:
@Override
public void sampleOccurred(SampleEvent event) {

SampleResult result = event.getResult();if (isSampleWanted(result.isSuccessful())) {    sendToVisualizer(result);    if (out != null && !isResultMarked(result) && !this.isStats) {    SampleSaveConfiguration config = getSaveConfig();    result.setSaveConfig(config);    try {           if (config.saveAsXml()) {               SaveService.saveSampleResult(event, out);           } else { // !saveAsXml               CSVSaveService.saveSampleResult(event, out);           }      } catch (Exception err) {          log.error("Error trying to record a sample", err); // should throw exception back to caller       }  }

}
if(summariser != null) {

   summariser.sampleOccurred(event);

}
}

以上次要实现了将每个申请的后果数据存储到日志文件中(CSV /XML),为后续的报告生成提供数据文件。
3、性能瓶颈剖析
从以上的流程不难看出,因为每个线程的每个申请后都会频繁调用Result Collector的sample Occurred办法,即会频繁读写文件,有可能导致IO瓶颈。一旦存储的速度降落,必然导致线程循环发包的速度降落,从而导致压不下来的状况呈现。所以单机模式下不倡议设置超过200以上的并发,若非必须,尽量敞开日志采集和html报告生成,免得报告置信度存在问题。

三、分布式模式
为了应答单机的各种瓶颈问题,JMeter采纳了分布式(master-slave)模式。加载执行流程与单机基本一致,不再赘述,区别在于监听器换成了Remote Sample ListenerImpl收集器。
1、发送模式指定办法
上面咱们重点看下Remote Sample ListenerImpl监听器的代码:
@Override
public void processBatch(List<SampleEvent> samples) {

if (samples != null && sampleListener != null) {    for (SampleEvent e : samples) {        sampleListener.sampleOccurred(e);    }}

}
@Override
public void sampleOccurred(SampleEvent e) {

if (sampleListener != null) {    sampleListener.sampleOccurred(e);}

}

从以上代码能够看出,这个监听器里又调用了sample Listener的sample Occurred办法,而sample Listener是通过用户在jmeter.property文件中指定的。

2、AsynchSampleSender源码解析
上面咱们以Asynch Sample Sender为例进行源码具体介绍:
public class AsynchSampleSender extends AbstractSampleSender implements Serializable {

   protected Object readResolve() throws ObjectStreamException{    int capacity = getCapacity();    log.info("Using batch queue size (asynch.batch.queue.size): {}", capacity); // server log file    queue = new ArrayBlockingQueue<>(capacity);    Worker worker = new Worker(queue, listener);    worker.setDaemon(true);    worker.start();    return this;}

@Override
public void testEnded(String host)

log.debug("Test Ended on {}", host);try {    listener.testEnded(host);    queue.put(FINAL_EVENT);} catch (Exception ex) {    log.warn("testEnded(host)", ex);}if (queueWaits > 0) {    log.info("QueueWaits: {}; QueueWaitTime: {} (nanoseconds)", queueWaits, queueWaitTime);    }}

@Override
public void sampleOccurred(SampleEvent e)

try {    if (!queue.offer(e)){ // we failed to add the element first time        queueWaits++;        long t1 = System.nanoTime();        queue.put(e);        long t2 = System.nanoTime();        queueWaitTime += t2-t1;    }} catch (Exception err) {    log.error("sampleOccurred; failed to queue the sample", err);}

}
private static class Worker extends Thread {

@Overridepublic void run()     try {        boolean eof = false;        while (!eof) {            List<SampleEvent> l = new ArrayList<>();            SampleEvent e = queue.take();            // try to process as many as possible            // The == comparison is not an error            while (!(eof = e == FINAL_EVENT) && e != null) {                 l.add(e);                 e = queue.poll(); // returns null if nothing on queue currently             }            int size = l.size();            if (size > 0) {                try {                   listener.processBatch(l);                } catch (RemoteException err) {                    if (err.getCause() instanceof java.net.ConnectException){                        throw new JMeterError("Could not return sample",err);                    }                    log.error("Failed to return sample", err);                }             }        }    } catch (InterruptedException e) {        Thread.currentThread().interrupt();        }    log.debug("Worker ended");    }}

}

从以上代码能够看出,Asynch SampleSender的sample Occurred办法里只进行入列的操作,而采集上报工作是启动了一个work线程实现的,相当于异步解决所有申请数据。这样设计不会阻塞发包的流程,性能上要优于单机模式。然而,在肯定状况下,也是会呈现性能瓶颈的。
这个队列采纳的是Array Blocking Queue(阻塞队列),这个队列有如下特点:
·Array Blocking Queue是有界的初始化必须指定大小,队列满了后,无奈入列。
·Array Blocking Queue实现的队列中的锁是没有拆散的,即增加操作和移除操作采纳的同一个Reenter Lock锁。
3、性能瓶颈剖析
瓶颈点一:队列大小问题
当咱们理论压测过程中,如果队列大小(asynch.batch.queue.size)设置过小,入列速度大于出列速度,就会导致队列满而阻塞整个发压流程,而如果队列设置过大,一旦申请的包体比拟大,很容易造成内存溢出。
瓶颈点二:繁多锁问题
在压测过程中,入列出列是十分频繁的,而同一个Reenter Lock锁也可能造成入列和入列过程中,因无奈取得锁而入列或者出列提早,继而影响发压效率。
四、总结
JMeter因其欠缺的社区和开源特点,在日常压测中可宽泛应用。JMeter适宜进行小规模的压测。然而在大规模的压测过程中,受本地机器性能、带宽等限度,不宜进行单机压测,能够应用JMeter的master-slave的形式进行分布式压测。然而需提前设置好后果收集器和队列的大小,并进行事后演练评估出下限qps,防止出现压不下来的状况。此外,master-slave通信形式是近程RMI的双向通信形式,连接数过多也会造成master的瓶颈呈现,须要做好量级的提前评估。

*版权申明:本文作者 优测性能测试专家高源。

优测压力测试是一款在线云原生全链路压测平台,百万级并发即召即用。兼容JMeter脚本,一键上传即可随时发压,免去压测工具搭建老本。欢送大家登录优测官网收费体验!

想理解更多压力测试常识、产品与服务,可扫下方二维码进群交换。

分割咱们
热线:
0755-86013388-843186
官网:
https://utest.21kunpeng.com/h...
企业微信: