一、问题零碎介绍

  1. 监听商品变更MQ音讯,查问商品最新的信息,调用BulkProcessor批量更新ES集群中的商品字段信息;
  2. 因为商品数据十分多,所以将商品数据存储到ES集群上,整个ES集群共划分了256个分片,并依据商品的三级类目ID进行分片路由。

比方一个SKU的商品名称发生变化,咱们就会收到这个SKU的变更MQ音讯,而后再去查问商品接口,将商品的最新名称查问回来,再依据这个SKU的三级分类ID进行路由,找到对应的ES集群分片,而后更新商品名称字段信息。

因为商品变更MQ音讯量微小,为了晋升更新ES的性能,防止出现MQ音讯积压问题,所以本零碎应用了BulkProcessor进行批量异步更新。

ES客户端版本如下:

        <dependency>            <artifactId>elasticsearch-rest-client</artifactId>            <groupId>org.elasticsearch.client</groupId>            <version>6.5.3</version>        </dependency>

BulkProcessor配置伪代码如下:

        //在这里调用build()办法结构bulkProcessor,在底层实际上是用了bulk的异步操作        this.fullDataBulkProcessor = BulkProcessor.builder((request, bulkListener) ->                fullDataEsClient.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener)                // 1000条数据申请执行一次bulk                .setBulkActions(1000)                // 5mb的数据刷新一次bulk                .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))                // 并发申请数量, 0不并发, 1并发容许执行                .setConcurrentRequests(1)                // 固定1s必须刷新一次                .setFlushInterval(TimeValue.timeValueSeconds(1L))                // 重试5次,距离1s                .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 5))                .build();

二、问题怎么发现的

  1. 618大促开始后,因为商品变更MQ音讯十分频繁,MQ音讯每天的音讯量更是达到了日常的数倍,而且好多商品还变更了三级类目ID;
  2. 零碎在更新这些三级类目ID发生变化的SKU商品信息时,依据批改后的三级类目ID路由后的分片更新商品信息时产生了谬误,并且重试了5次,仍然没有胜利;
  3. 因为在新路由的分片上没有这个商品的索引信息,这些更新申请永远也不会执行胜利,零碎的日志文件中也记录了大量的异样重试日志。
  4. 商品变更MQ音讯也开始呈现了积压报警,MQ音讯的生产速度显著赶不上生产速度。
  5. 察看MQ音讯消费者的UMP监控数据,发现生产性能很安稳,没有显著稳定,然而调用次数会在零碎生产MQ一段时间后呈现断崖式降落,由原来的每分钟几万调用量逐步降落到个位数。
  6. 在重启利用后,零碎又开始生产,UMP监控调用次数复原到失常程度,然而零碎运行一段时间后,还是会呈现生产暂停问题,好像所有生产线程都被暂停了一样。

三、排查问题的具体过程

首先找一台暂停生产MQ音讯的容器,查看利用过程ID,应用jstack命令dump利用过程的整个线程堆栈信息,将导出的线程堆栈信息打包上传到 https://fastthread.io/ 进行线程状态剖析。剖析报告如下:

通过剖析报告发现有124个处于BLOCKED状态的线程,而后能够点击查看各线程的具体堆栈信息,堆栈信息如下:

间断查看多个线程的具体堆栈信息,MQ生产线程都是在waiting to lock <0x00000005eb781b10> (a org.elasticsearch.action.bulk.BulkProcessor),而后依据0x00000005eb781b10去搜寻发现,这个对象锁正在被另外一个线程占用,占用线程堆栈信息如下:

这个线程状态此时正处于WAITING状态,通过线程名称发现,该线程应该是ES客户端外部线程。正是该线程抢占了业务线程的锁,而后又在期待其余条件触发该线程执行,所以导致了所有的MQ生产业务线程始终无奈获取BulkProcessor外部的锁,导致呈现了生产暂停问题。

然而这个线程elasticsearchscheduler为啥不能执行? 它是什么时候启动的? 又有什么作用?

就须要咱们对BulkProcessor进行深入分析,因为BulkProcessor是通过builder模块进行创立的,所以深刻builder源码,理解一下BulkProcessor的创立过程。

public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {        Objects.requireNonNull(consumer, "consumer");        Objects.requireNonNull(listener, "listener");        final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);        return new Builder(consumer, listener,                (delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS),                () -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));    }

外部创立了一个工夫调度执行线程池,线程命名规定和上述持有锁的线程名称类似,具体代码如下:

static ScheduledThreadPoolExecutor initScheduler(Settings settings) {        ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,                EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());        scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);        scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);        scheduler.setRemoveOnCancelPolicy(true);        return scheduler;    }

最初在build办法外部执行了BulkProcessor的外部有参构造方法,在构造方法外部启动了一个周期性执行的flushing工作,代码如下

 BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,                  int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,                  Scheduler scheduler, Runnable onClose) {        this.bulkActions = bulkActions;        this.bulkSize = bulkSize.getBytes();        this.bulkRequest = new BulkRequest();        this.scheduler = scheduler;        this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests);        // Start period flushing task after everything is setup        this.cancellableFlushTask = startFlushTask(flushInterval, scheduler);        this.onClose = onClose;    }
private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) {        if (flushInterval == null) {            return new Scheduler.Cancellable() {                @Override                public void cancel() {}                @Override                public boolean isCancelled() {                    return true;                }            };        }        final Runnable flushRunnable = scheduler.preserveContext(new Flush());        return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);    }
class Flush implements Runnable {        @Override        public void run() {            synchronized (BulkProcessor.this) {                if (closed) {                    return;                }                if (bulkRequest.numberOfActions() == 0) {                    return;                }                execute();            }        }    }

通过源代码发现,该flush工作就是在创立BulkProcessor对象时设置的固定工夫flush逻辑,当setFlushInterval办法参数失效,就会启动一个后盾定时flush工作。flush距离,由setFlushInterval办法参数定义。该flush工作在运行期间,也会抢占BulkProcessor对象锁,抢到锁后,才会执行execute办法。具体的办法调用关系源代码如下:

/**     * Adds the data from the bytes to be processed by the bulk processor     */    public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,                                          @Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception {        bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true, xContentType);        executeIfNeeded();        return this;    }    private void executeIfNeeded() {        ensureOpen();        if (!isOverTheLimit()) {            return;        }        execute();    }    // (currently) needs to be executed under a lock    private void execute() {        final BulkRequest bulkRequest = this.bulkRequest;        final long executionId = executionIdGen.incrementAndGet();        this.bulkRequest = new BulkRequest();        this.bulkRequestHandler.execute(bulkRequest, executionId);    }

而上述代码中的add办法,则是由MQ生产业务线程去调用,在该办法上同样有一个synchronized关键字,所以生产MQ业务线程会和flush工作执行线程间接会存在锁竞争关系。具体MQ生产业务线程调用伪代码如下:

 @Override public void upsertCommonSku(CommonSkuEntity commonSkuEntity) {            String source = JsonUtil.toString(commonSkuEntity);            UpdateRequest updateRequest = new UpdateRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());            updateRequest.doc(source, XContentType.JSON);            IndexRequest indexRequest = new IndexRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());            indexRequest.source(source, XContentType.JSON);            updateRequest.upsert(indexRequest);            updateRequest.routing(commonSkuEntity.getCat3().toString());            fullbulkProcessor.add(updateRequest);}  

通过以上对线程堆栈剖析,发现所有的业务线程都在期待elasticsearchscheduler线程开释BulkProcessor对象锁,然而该线程确始终没有开释该对象锁,从而呈现了业务线程的死锁问题。

联合利用日志文件中呈现的大量异样重试日志,可能与BulkProcessor的异样重试策略无关,而后进一步理解BulkProcessor的异样重试代码逻辑。因为业务线程中提交BulkRequest申请都对立提交到了BulkRequestHandler对象中的execute办法外部进行解决,代码如下:

public final class BulkRequestHandler {    private final Logger logger;    private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;    private final BulkProcessor.Listener listener;    private final Semaphore semaphore;    private final Retry retry;    private final int concurrentRequests;    BulkRequestHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy,                       BulkProcessor.Listener listener, Scheduler scheduler, int concurrentRequests) {        assert concurrentRequests >= 0;        this.logger = Loggers.getLogger(getClass());        this.consumer = consumer;        this.listener = listener;        this.concurrentRequests = concurrentRequests;        this.retry = new Retry(backoffPolicy, scheduler);        this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);    }    public void execute(BulkRequest bulkRequest, long executionId) {        Runnable toRelease = () -> {};        boolean bulkRequestSetupSuccessful = false;        try {            listener.beforeBulk(executionId, bulkRequest);            semaphore.acquire();            toRelease = semaphore::release;            CountDownLatch latch = new CountDownLatch(1);            retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {                @Override                public void onResponse(BulkResponse response) {                    try {                        listener.afterBulk(executionId, bulkRequest, response);                    } finally {                        semaphore.release();                        latch.countDown();                    }                }                @Override                public void onFailure(Exception e) {                    try {                        listener.afterBulk(executionId, bulkRequest, e);                    } finally {                        semaphore.release();                        latch.countDown();                    }                }            });            bulkRequestSetupSuccessful = true;            if (concurrentRequests == 0) {                latch.await();            }        } catch (InterruptedException e) {            Thread.currentThread().interrupt();            logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);            listener.afterBulk(executionId, bulkRequest, e);        } catch (Exception e) {            logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);            listener.afterBulk(executionId, bulkRequest, e);        } finally {            if (bulkRequestSetupSuccessful == false) {  // if we fail on client.bulk() release the semaphore                toRelease.run();            }        }    }    boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {        if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {            semaphore.release(this.concurrentRequests);            return true;        }        return false;    }}

BulkRequestHandler通过构造方法初始化了一个Retry工作对象,该对象中也传入了一个Scheduler,且该对象和flush工作中传入的是同一个线程池,该线程池外部只保护了一个固定线程。而execute办法首先会先依据Semaphore来管制并发执行数量,该并发数量在构建BulkProcessor时通过参数指定,通过上述配置发现该值配置为1。所以每次只容许一个线程执行该办法。即MQ生产业务线程和flush工作线程,同一时间只能有一个线程能够执行。而后上面在理解一下重试工作是如何执行的,具体看如下代码:

 public void withBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest,                            ActionListener<BulkResponse> listener) {        RetryHandler r = new RetryHandler(backoffPolicy, consumer, listener, scheduler);        r.execute(bulkRequest);    }

RetryHandler外部会执行提交bulkRequest申请,同时也会监听bulkRequest执行异样状态,而后执行工作重试逻辑,重试代码如下:

private void retry(BulkRequest bulkRequestForRetry) {            assert backoff.hasNext();            TimeValue next = backoff.next();            logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());            Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry));            scheduledRequestFuture = scheduler.schedule(next, ThreadPool.Names.SAME, command);        }

RetryHandler将执行失败的bulk申请从新交给了外部scheduler线程池去执行,通过以上代码理解,该线程池外部只保护了一个固定线程,同时该线程池可能还会被另一个flush工作去占用执行。所以如果重试逻辑正在执行的时候,此时线程池内的惟一线程正在执行flush工作,则会阻塞重试逻辑执行,重试逻辑不能执行实现,则不会开释Semaphore,然而因为并发数量配置的是1,所以flush工作线程须要期待其余线程开释一个Semaphore许可后能力继续执行。所以此处造成了循环期待,导致Semaphore和BulkProcessor对象锁都无奈开释,从而使得所有的MQ生产业务线程都阻塞在获取BulkProcessor锁之前。

同时,在GitHub的ES客户端源码客户端上也能搜寻到相似问题,例如: https://github.com/elastic/elasticsearch/issues/47599 ,所以更加印证了之前的猜测,就是因为bulk的一直重试从而引发了BulkProcessor外部的死锁问题。

四、如何解决问题

既然前边曾经理解到了问题产生的起因,所以就有了如下几种解决方案:

1.降级ES客户端版本到7.6正式版,后续版本通过将异样重试工作线程池和flush工作线程池进行了物理隔离,从而防止了线程池的竞争,然而须要思考版本兼容性。

2.因为该死锁问题是由大量异样重试逻辑引起的,能够在不影响业务逻辑的状况勾销重试逻辑,该计划能够不须要降级客户端版本,然而须要评估业务影响,执行失败的申请能够通过其余其余形式进行业务重试。

如有疏漏不妥之处,欢送斧正!

作者:京东批发 曹志飞

起源:京东云开发者社区