- 背景:
我的项目须要对接 confluent-kafka 压测,查看生产端的性能状况。并且 confluent-kafka 开了 SSL 验证,须要账号密码,如果间接用 jmeter 的 kafka 插件,是不满足应用需要,所以只能独自从新写一个对接 confluent-kafka 的插件!!! - 测试场景
模仿场景,1 并发,发送 100 笔音讯, 发送雷同的内容 -
先上后果:
1. 第一次写的 JavaSampler 插件后果:
均匀每笔 600ms 左右,TPS 只有 1.7/s- 第二次批改后的 JavaSampler 插件后果:
均匀每笔 7ms 左右,TPS 能到 130/s,(如果不限度申请量,TPS 还能再高点,能到 1000 左右)
从下面的后果很显著的看进去,第一个写的就是垃圾,(ps. 因为之前用 spring 框架曾经验证过了,每秒 confluent-kafka 的性能能到 1000 左右)所以,排除人家中间件的锅,那就是本人写了个垃圾进去,而后就是漫长的排查之路!!!
- 上代码吧:
public void product(Properties props, String topic, String key, String value) throws InterruptedException, InstantiationException, IllegalAccessException {
// 判断 topic,来实例化对象
Class avroType = null;
switch (topic){
case "staging-shareservice-masterdata-style":
avroType = ProductStyle.class;
break;
case "staging-shareservice-masterdata-styleoption":
avroType = ProductStyleOption.class;
break;
case "staging-shareservice-masterdata-sku":
avroType = ProductSku.class;
break;
case "staging-shareservice-masterdata-price":
avroType = Price.class;
break;
case "staging-shareservice-masterdata-location-standard" :
avroType = LocationStandard.class;
}
// 序列化 value
Object avroValue = avroValueSerializer.avroValue(value, avroType);
// 筹备生产者
KafkaProducer<String, Object> producer = new KafkaProducer<>(props);
ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, avroValue);
try {
// 1、发送音讯
producer.send(record);
} catch (Exception e) {e.printStackTrace();
}
// producer.close();}
就是下面这段发送逻辑,太菜了,看 jmeter 日志,发现频繁的打印配置信息,每发一次打印一次,很显著每次发送都加载了配置信息导致的,配置信息个别都是初始化的时候加载一次,前面复用就行了,好了点找到了,接下来就是看哪里加载的配置信息了,而后就开始低效调优。
1、先把配置类初始化放 setup 里,后果不言而喻有效;
2、把 KafkaProducer 也放 setup 中,尝试了一下,发现效果显著;
哈哈,问题找到,成果也很显著,最初的代码
myKafkaProducer myKafkaProducer = null;
Properties props = null;
// 筹备生产者
KafkaProducer<String, Object> producer = null;
// 发送内容对象
ProducerRecord<String, Object> record = null;
// 序列化 value 类型
Object avroValue = null;
// 初始化
public void setupTest(JavaSamplerContext context) {myKafkaProducer = new myKafkaProducer();
String paramBroker = context.getParameter("broker");
String paramTopic = context.getParameter("topic");
String paramKey = context.getParameter("key");
String paramValue = context.getParameter("value");
// 初始化配置信息
props = myKafkaProducer.initNewConfig(paramBroker);
// 筹备生产者
producer = new KafkaProducer<>(props);
// 判断 topic,来实例化对象
Class avroType = null;
switch (paramTopic){
case "staging-shareservice-masterdata-style":
avroType = ProductStyle.class;
break;
case "staging-shareservice-masterdata-styleoption":
avroType = ProductStyleOption.class;
break;
case "staging-shareservice-masterdata-sku":
avroType = ProductSku.class;
break;
case "staging-shareservice-masterdata-price":
avroType = Price.class;
break;
case "staging-shareservice-masterdata-location-standard" :
avroType = LocationStandard.class;
}
try {avroValue = avroValueSerializer.avroValue(paramValue, avroType);
} catch (InstantiationException e) {e.printStackTrace();
} catch (IllegalAccessException e) {e.printStackTrace();
}
}
@Override
public SampleResult runTest(JavaSamplerContext javaSamplerContext) {SampleResult result = this.newSampleResult();
String paramTopic = javaSamplerContext.getParameter("topic");
String paramKey = javaSamplerContext.getParameter("key");
String paramValue = javaSamplerContext.getParameter("value");
StringBuilder paramStr = new StringBuilder("topic:")
.append(paramTopic).append(",\nkey:")
.append(paramKey).append(", \nvalue:")
.append(paramValue);
sampleResultStart(result, paramStr.toString());
record = new ProducerRecord<>(paramTopic, paramKey, avroValue);
try {
// 1、发送音讯
producer.send(record);
sampleResultSuccess(result, "异步发送胜利");
}catch (Exception ex){sampleResultFailed(result, "500", ex);
}
return result;
}