- 背景:
我的项目须要对接confluent-kafka压测,查看生产端的性能状况。并且confluent-kafka开了SSL验证,须要账号密码,如果间接用jmeter的kafka插件,是不满足应用需要,所以只能独自从新写一个对接confluent-kafka的插件!!! - 测试场景
模仿场景,1并发,发送100笔音讯,发送雷同的内容 先上后果:
1.第一次写的 JavaSampler 插件后果:
均匀每笔600ms左右,TPS只有1.7/s- 第二次批改后的 JavaSampler 插件后果:
均匀每笔7ms左右,TPS能到130/s,(如果不限度申请量,TPS还能再高点,能到1000左右)
从下面的后果很显著的看进去,第一个写的就是垃圾,(ps. 因为之前用spring框架曾经验证过了,每秒confluent-kafka的性能能到1000左右)所以,排除人家中间件的锅,那就是本人写了个垃圾进去,而后就是漫长的排查之路!!!
- 上代码吧:
public void product(Properties props, String topic, String key, String value) throws InterruptedException, InstantiationException, IllegalAccessException { //判断topic,来实例化对象 Class avroType = null; switch (topic){ case "staging-shareservice-masterdata-style": avroType = ProductStyle.class; break; case "staging-shareservice-masterdata-styleoption": avroType = ProductStyleOption.class; break; case "staging-shareservice-masterdata-sku": avroType = ProductSku.class; break; case "staging-shareservice-masterdata-price": avroType = Price.class; break; case "staging-shareservice-masterdata-location-standard" : avroType = LocationStandard.class; } //序列化value Object avroValue = avroValueSerializer.avroValue(value, avroType); //筹备生产者 KafkaProducer<String, Object> producer = new KafkaProducer<>(props); ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, avroValue); try { // 1、发送音讯 producer.send(record); } catch (Exception e) { e.printStackTrace(); }// producer.close(); }
就是下面这段发送逻辑,太菜了,看jmeter日志,发现频繁的打印配置信息,每发一次打印一次,很显著每次发送都加载了配置信息导致的,配置信息个别都是初始化的时候加载一次,前面复用就行了,好了点找到了,接下来就是看哪里加载的配置信息了,而后就开始低效调优。
1、先把配置类初始化放setup里,后果不言而喻有效;
2、把KafkaProducer也放setup中,尝试了一下,发现效果显著;
哈哈,问题找到, 成果也很显著,最初的代码
myKafkaProducer myKafkaProducer = null; Properties props = null; //筹备生产者 KafkaProducer<String, Object> producer = null;// 发送内容对象 ProducerRecord<String, Object> record = null; //序列化value类型 Object avroValue = null; //初始化 public void setupTest(JavaSamplerContext context) { myKafkaProducer = new myKafkaProducer(); String paramBroker = context.getParameter("broker"); String paramTopic = context.getParameter("topic"); String paramKey = context.getParameter("key"); String paramValue = context.getParameter("value"); //初始化配置信息 props = myKafkaProducer.initNewConfig(paramBroker); //筹备生产者 producer = new KafkaProducer<>(props); //判断topic,来实例化对象 Class avroType = null; switch (paramTopic){ case "staging-shareservice-masterdata-style": avroType = ProductStyle.class; break; case "staging-shareservice-masterdata-styleoption": avroType = ProductStyleOption.class; break; case "staging-shareservice-masterdata-sku": avroType = ProductSku.class; break; case "staging-shareservice-masterdata-price": avroType = Price.class; break; case "staging-shareservice-masterdata-location-standard" : avroType = LocationStandard.class; } try { avroValue = avroValueSerializer.avroValue(paramValue, avroType); } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } } @Override public SampleResult runTest(JavaSamplerContext javaSamplerContext) { SampleResult result = this.newSampleResult(); String paramTopic = javaSamplerContext.getParameter("topic"); String paramKey = javaSamplerContext.getParameter("key"); String paramValue = javaSamplerContext.getParameter("value"); StringBuilder paramStr = new StringBuilder("topic:") .append(paramTopic).append(",\nkey:") .append(paramKey).append(", \nvalue:") .append(paramValue); sampleResultStart(result, paramStr.toString()); record = new ProducerRecord<>(paramTopic, paramKey, avroValue); try { // 1、发送音讯 producer.send(record); sampleResultSuccess(result, "异步发送胜利"); }catch (Exception ex){ sampleResultFailed(result, "500", ex); } return result; }