共计 5686 个字符,预计需要花费 15 分钟才能阅读完成。
背景
明天收到业务团队反馈线上有个利用往 Pulsar 中发送音讯失败了,通过日志查看得悉是发送音讯时候抛出了 java.lang.InterruptedException
异样。
和业务沟通后得悉是在一个 gRPC
接口中触发的音讯发送,大概继续了半个小时的异样后便恢复正常了,这是整个问题的背景。
前置排查
拿到该问题后首先排查下是否是共性问题,查看了其余的利用没有发现相似的异样;同时也查看了 Pulsar broker 的监控大盘,在这个时间段仍然没有稳定和异样;
这样能够初步排除是 Pulsar 服务端的问题。
接着便是查看利用那段时间的负载状况,从利用 QPS 到 JVM 的各个内存状况仍然没发现有什么显著的变动。
Pulsar 源码排查
既然看起来利用自身和 Pulsar broker 都没有问题的话那就只能从异样自身来排查了。
首先第一步要得悉具体应用的是 Pulsar-client
是版本是多少,因为业务应用的是外部基于官网 SDK 封装 springboot starter
所以第一步还得排查这个 starter
是否有影响。
通过查看源码根本排除了 starter
的嫌疑,外面只是简略的封装了 SDK
的性能而已。
org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:91) at
java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89) ... 49 common frames omitted Caused by: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775)
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ(ProducerImpl.java:393)
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ$accessor$i7NYMN6i(ProducerImpl.java)
at org.apache.pulsar.client.impl.ProducerImpl$auxiliary$EfuVvJLT.call(Unknown Source)
at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86)
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java)
at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:292)
at org.apache.pulsar.client.impl.ProducerImpl.internalSendWithTxnAsync(ProducerImpl.java:363)
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendWithTxnAsync(PartitionedProducerImpl.java:191)
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendAsync(PartitionedProducerImpl.java:167)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:103)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:82) ... 49 common frames omitted Caused by: java.lang.InterruptedException: null
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1343)
at java.base/java.util.concurrent.Semaphore.acquire(Semaphore.java:318)
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:758)
接下来便只能是剖析堆栈了,因为 Pulsar-client 的局部实现源码是没有间接打包到依赖中的,反编译的话许多代码行数对不上,所以须要将官网的源码拉到本地,切换到对于的分支进行查看。
这一步略微有点麻烦,首先是代码库还挺大的,加上之前如果没有筹备好 Pulsar 的开发环境的话预计会劝退一部分人;但其实大部分问题都是网络造成的,只有配置一些 Maven 镜像多试几次总会编译胜利。
我这里间接将分支切换到 branch-2.8
。
从堆栈的顶部开始排查 TypedMessageBuilderImpl.java:91
:
看起来是外部异步发送音讯的时候抛了异样。
接着往下看到这里:
java.lang.InterruptedException
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) at
看起来是这里没错,然而代码行数显著不对;因为 2.8 这个分支也是修复过几个版本,所以两头有批改导致代码行数与最新代码对不上也失常。
semaphore.get().acquire();
不过初步来看应该是这行代码抛出的线程终端异样,这里看起来只有他最有可能了。
为了确认是否是真的是这行代码,这个文件再往前翻了几个版本最终确认了就是这行代码没错了。
咱们点开 java.util.concurrent.Semaphore#acquire()
的源码,
/**
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* for a permit,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* @throws InterruptedException if the current thread is interrupted
*/
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {if (Thread.interrupted() ||
(tryAcquireShared(arg) < 0 &&
acquire(null, arg, true, true, false, 0L) < 0))
throw new InterruptedException();}
通过源码会发现 acquire()
函数的确会响应中断,一旦检测到以后线程被中断后便会抛出 InterruptedException
异样。
定位问题
所以问题的起因根本确定了,就是在 Pulsar 的发送音讯线程被中断了导致的,但为啥会被中断还须要持续排查。
咱们晓得线程中断是须要调用 Thread.currentThread().interrupt();
API 的,首先猜想是否 Pulsar 客户端外部有个线程中断了这个发送线程。
于是我在 pulsar-client
这个模块中搜寻了相干代码:
排除掉和 producer 不相干的中央,其余所有中断线程的代码都是在有了该异样之后持续传递而已;所以初步来看 pulsar-client 外部没有被动中断的操作。
既然 Pulsar 本人没有做,那就只可能是业务做的了?
于是我在业务代码中搜寻了一下:
果然在业务代码中搜到了惟一一处中断的中央,而且通过调用关系得悉这段代码是在音讯发送前执行的,并且和 Pulsar 发送函数处于同一线程。
大略的伪代码如下:
List.of(1, 2, 3).stream().map(e -> {return CompletableFuture.supplyAsync(() -> {
try {TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException ex) {throw new RuntimeException(ex);
}
return e;
});
}
).collect(Collectors.toList()).forEach(f -> {
try {Integer integer = f.get();
log.info("====" + integer);
if (integer==3){TimeUnit.SECONDS.sleep(10);
Thread.currentThread().interrupt();
}
} catch (InterruptedException e) {throw new RuntimeException(e);
} catch (ExecutionException e) {throw new RuntimeException(e);
}
});
MessageId send = producer.newMessage().value(msg.getBytes()).send();
执行这段代码能够齐全复现同样的堆栈。
幸好中断这里还打得有日志:
通过日志搜寻发现异常的工夫和这个中断的日志工夫点齐全重合,这样也就晓得根本原因了。
因为业务线程和音讯发送线程是同一个,在某些状况下会执行 Thread.currentThread().interrupt();
,其实单纯执行这行函数并不会产生什么,只有没有去响应这个中断,也就是 Semaphore
源码中的判断了线程中断的标记:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {if (Thread.interrupted() ||
(tryAcquireShared(arg) < 0 &&
acquire(null, arg, true, true, false, 0L) < 0))
throw new InterruptedException();}
但恰好这里业务中断后本人并没有去判断这个标记,导致 Pulsar 外部去判断了,最终抛出了这个异样。
总结
所以归根结底还是这里的代码不合理导致的,首先是本人中断了线程但也没应用,从而导致有被其余根底库应用的可能,所以会造成了一些不可预知的结果。
再一个是不倡议在业务代码中应用 Thread.currentThread().interrupt();
这类代码,第一眼基本不晓得是要干啥,也不易保护。
其实实质上线程中断也是线程间通信的一种伎俩,有这类需要齐全能够换为内置的 BlockQueue
这类函数来实现。