关于性能测试:优分享JMeter源码解析之结果收集器

37次阅读

共计 6225 个字符,预计需要花费 16 分钟才能阅读完成。

本文作者 优测性能测试专家高源。
简介:本文以最新的 JMeter 5.5 版本源代码为例具体介绍了单机模式和分布式模式下后果收集器的工作原理。通篇干货,还不快来理解一下!

一、JMeter 后果收集器概述
JMeter 是在压力畛域中最常见的性能测试工具,因为其开源的特点,受到宽广测试和开发同学的青眼。然而,在理论利用过程中,JMeter 存在的一些性能瓶颈也凸显进去,常常会遇到大并发下压不下来的状况。笔者通过深入分析其源码实现,找到 JMeter 存在的瓶颈问题及根本原因,为当前更好地应用工具提供一些思路。
后果收集器:在 JMeter 中负责报告数据收集的重任,无论是单机模式还是 master-slave 模式,每一个申请的后果都是通过相应的后果收集器进行数据采集的。在单机模式下用 Result Collector 这个监听器去采集,在分布式(master-slave)场景下通过配 RemoteSampleListenerWrapper 下的指定 sender 进行收集,具体配置 jmeter.property 文件的 mode 属性和队列长度实现。上面咱们以以后最新的 JMeter 5.5 版本的源代码为例具体介绍下单机模式和分布式模式下后果收集器的工作原理。
二、单机模式
1、初始化
在命令行模式下,JMeter 会依据用户的 logfile 配置抉择是否增加 Result Collector,个别在理论测试的时候,咱们都是须要有具体统计报告生成的,所以都会增加 Result Collector,收集器放在了整个 hashtree 的第一个节点,代码如下:
void runNonGui(String testFile, String logFile, boolean remoteStart, String remoteHostsString, boolean generateReportDashboard){
….
ResultCollector resultCollector = null;
if (logFile != null) {

 resultCollector = new ResultCollector(summariser);
 resultCollector.setFilename(logFile);
 clonedTree.add(clonedTree.getArray()[0], resultCollector);
 }

else {

 // only add Summariser if it can not be shared with the ResultCollector

if (summariser != null) {

  clonedTree.add(clonedTree.getArray()[0], summariser);
  }
  }

….

}
2、加载流程
增加完后果收集器后,执行脚本过程中,JMeter 会依据 jmx 的编排,依照如下的执行程序进行调用:

每一个线程都是依照以上的程序循环反复执行,直到压测进行。具体代码如下(相应的关键点已减少正文):
private void executeSamplePackage(Sampler current,

  TransactionSampler transactionSampler,
  SamplePackage transactionPack,
  JMeterContext threadContext) {

threadContext.setCurrentSampler(current);
// Get the sampler ready to sample
SamplePackage pack = compiler.configureSampler(current);
runPreProcessors(pack.getPreProcessors());// 运行前置处理器
// Hack: save the package for any transaction controllers
threadVars.putObject(PACKAGE_OBJECT, pack);
delay(pack.getTimers());// 定时器 timer
SampleResult result = null;
if (running) {

   Sampler sampler = pack.getSampler();
   result = doSampling(threadContext, sampler);

}
// If we got any results, then perform processing on the result
if (result != null) {
if (!result.isIgnore()) {

      ...                

runPostProcessors(pack.getPostProcessors());// 运行后置处理器
checkAssertions(pack.getAssertions(), result, threadContext);// 运行断言处理器

        // PostProcessors can call setIgnore, so reevaluate here
        if (!result.isIgnore()) {
        // Do not send subsamples to listeners which receive the transaction sample
        List<SampleListener> sampleListeners = getSampleListeners(pack, transactionPack, transactionSampler);
        notifyListeners(sampleListeners, result);// 执行监听器, 此处为执行报告收集器的 sampleOccurred 办法
        }
        compiler.done(pack);
        ...
}

收集器 Result Collector 执行的具体代码:
@Override
public void sampleOccurred(SampleEvent event) {

SampleResult result = event.getResult();
if (isSampleWanted(result.isSuccessful())) {sendToVisualizer(result);
    if (out != null && !isResultMarked(result) && !this.isStats) {SampleSaveConfiguration config = getSaveConfig();
    result.setSaveConfig(config);
    try {if (config.saveAsXml()) {SaveService.saveSampleResult(event, out);
           } else { // !saveAsXml
               CSVSaveService.saveSampleResult(event, out);
           }
      } catch (Exception err) {log.error("Error trying to record a sample", err); // should throw exception back to caller
       }
  }

}
if(summariser != null) {

   summariser.sampleOccurred(event);

}
}

以上次要实现了将每个申请的后果数据存储到日志文件中(CSV /XML),为后续的报告生成提供数据文件。
3、性能瓶颈剖析
从以上的流程不难看出,因为每个线程的每个申请后都会频繁调用 Result Collector 的 sample Occurred 办法,即会频繁读写文件,有可能导致 IO 瓶颈。一旦存储的速度降落,必然导致线程循环发包的速度降落,从而导致压不下来的状况呈现。所以单机模式下不倡议设置超过 200 以上的并发,若非必须,尽量敞开日志采集和 html 报告生成,免得报告置信度存在问题。

三、分布式模式
为了应答单机的各种瓶颈问题,JMeter 采纳了分布式(master-slave)模式。加载执行流程与单机基本一致,不再赘述,区别在于监听器换成了 Remote Sample ListenerImpl 收集器。
1、发送模式指定办法
上面咱们重点看下 Remote Sample ListenerImpl 监听器的代码:
@Override
public void processBatch(List<SampleEvent> samples) {

if (samples != null && sampleListener != null) {for (SampleEvent e : samples) {sampleListener.sampleOccurred(e);
    }
}

}
@Override
public void sampleOccurred(SampleEvent e) {

if (sampleListener != null) {sampleListener.sampleOccurred(e);
}

}

从以上代码能够看出,这个监听器里又调用了 sample Listener 的 sample Occurred 办法,而 sample Listener 是通过用户在 jmeter.property 文件中指定的。

2、AsynchSampleSender 源码解析
上面咱们以 Asynch Sample Sender 为例进行源码具体介绍:
public class AsynchSampleSender extends AbstractSampleSender implements Serializable {

   protected Object readResolve() throws ObjectStreamException{int capacity = getCapacity();
    log.info("Using batch queue size (asynch.batch.queue.size): {}", capacity); // server log file
    queue = new ArrayBlockingQueue<>(capacity);
    Worker worker = new Worker(queue, listener);
    worker.setDaemon(true);
    worker.start();
    return this;
}

@Override
public void testEnded(String host)

log.debug("Test Ended on {}", host);
try {listener.testEnded(host);
    queue.put(FINAL_EVENT);
} catch (Exception ex) {log.warn("testEnded(host)", ex);
}
if (queueWaits > 0) {log.info("QueueWaits: {}; QueueWaitTime: {} (nanoseconds)", queueWaits, queueWaitTime);
    }
}

@Override
public void sampleOccurred(SampleEvent e)

try {if (!queue.offer(e)){ // we failed to add the element first time
        queueWaits++;
        long t1 = System.nanoTime();
        queue.put(e);
        long t2 = System.nanoTime();
        queueWaitTime += t2-t1;
    }
} catch (Exception err) {log.error("sampleOccurred; failed to queue the sample", err);
}

}
private static class Worker extends Thread {

@Override
public void run() 
    try {
        boolean eof = false;
        while (!eof) {List<SampleEvent> l = new ArrayList<>();
            SampleEvent e = queue.take();
            // try to process as many as possible
            // The == comparison is not an error
            while (!(eof = e == FINAL_EVENT) && e != null) {l.add(e);
                 e = queue.poll(); // returns null if nothing on queue currently}
            int size = l.size();
            if (size > 0) {
                try {listener.processBatch(l);
                } catch (RemoteException err) {if (err.getCause() instanceof java.net.ConnectException){throw new JMeterError("Could not return sample",err);
                    }
                    log.error("Failed to return sample", err);
                }
             }
        }
    } catch (InterruptedException e) {Thread.currentThread().interrupt();}
    log.debug("Worker ended");
    }
}

}

从以上代码能够看出,Asynch SampleSender 的 sample Occurred 办法里只进行入列的操作,而采集上报工作是启动了一个 work 线程实现的,相当于异步解决所有申请数据。这样设计不会阻塞发包的流程,性能上要优于单机模式。然而,在肯定状况下,也是会呈现性能瓶颈的。
这个队列采纳的是 Array Blocking Queue(阻塞队列),这个队列有如下特点:
·Array Blocking Queue 是有界的初始化必须指定大小,队列满了后,无奈入列。
·Array Blocking Queue 实现的队列中的锁是没有拆散的,即增加操作和移除操作采纳的同一个 Reenter Lock 锁。
3、性能瓶颈剖析
瓶颈点一:队列大小问题
当咱们理论压测过程中,如果队列大小(asynch.batch.queue.size)设置过小,入列速度大于出列速度,就会导致队列满而阻塞整个发压流程,而如果队列设置过大,一旦申请的包体比拟大,很容易造成内存溢出。
瓶颈点二:繁多锁问题
在压测过程中,入列出列是十分频繁的,而同一个 Reenter Lock 锁也可能造成入列和入列过程中,因无奈取得锁而入列或者出列提早,继而影响发压效率。
四、总结
JMeter 因其欠缺的社区和开源特点,在日常压测中可宽泛应用。JMeter 适宜进行小规模的压测。然而在大规模的压测过程中,受本地机器性能、带宽等限度,不宜进行单机压测,能够应用 JMeter 的 master-slave 的形式进行分布式压测。然而需提前设置好后果收集器和队列的大小,并进行事后演练评估出下限 qps,防止出现压不下来的状况。此外,master-slave 通信形式是近程 RMI 的双向通信形式,连接数过多也会造成 master 的瓶颈呈现,须要做好量级的提前评估。

* 版权申明:本文作者 优测性能测试专家高源。

优测压力测试是一款在线云原生全链路压测平台,百万级并发即召即用。兼容 JMeter 脚本,一键上传即可随时发压,免去压测工具搭建老本。欢送大家登录优测官网收费体验!

想理解更多压力测试常识、产品与服务,可扫下方二维码进群交换。

分割咱们
热线:
0755-86013388-843186
官网:
https://utest.21kunpeng.com/h…
企业微信:

正文完
 0