关于elasticsearch:ElasticSearch-批量更新bulk死锁问题排查-京东云技术团队

4次阅读

共计 9775 个字符,预计需要花费 25 分钟才能阅读完成。

一、问题零碎介绍

  1. 监听商品变更 MQ 音讯,查问商品最新的信息,调用 BulkProcessor 批量更新 ES 集群中的商品字段信息;
  2. 因为商品数据十分多,所以将商品数据存储到 ES 集群上,整个 ES 集群共划分了 256 个分片,并依据商品的三级类目 ID 进行分片路由。

比方一个 SKU 的商品名称发生变化,咱们就会收到这个 SKU 的变更 MQ 音讯,而后再去查问商品接口,将商品的最新名称查问回来,再依据这个 SKU 的三级分类 ID 进行路由,找到对应的 ES 集群分片,而后更新商品名称字段信息。

因为商品变更 MQ 音讯量微小,为了晋升更新 ES 的性能,防止出现 MQ 音讯积压问题,所以本零碎应用了 BulkProcessor 进行批量异步更新。

ES 客户端版本如下:

        <dependency>
            <artifactId>elasticsearch-rest-client</artifactId>
            <groupId>org.elasticsearch.client</groupId>
            <version>6.5.3</version>
        </dependency>

BulkProcessor 配置伪代码如下:

        // 在这里调用 build() 办法结构 bulkProcessor, 在底层实际上是用了 bulk 的异步操作
        this.fullDataBulkProcessor = BulkProcessor.builder((request, bulkListener) ->
                fullDataEsClient.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener)
                // 1000 条数据申请执行一次 bulk
                .setBulkActions(1000)
                // 5mb 的数据刷新一次 bulk
                .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
                // 并发申请数量, 0 不并发, 1 并发容许执行
                .setConcurrentRequests(1)
                // 固定 1s 必须刷新一次
                .setFlushInterval(TimeValue.timeValueSeconds(1L))
                // 重试 5 次,距离 1s
                .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 5))
                .build();

二、问题怎么发现的

  1. 618 大促开始后,因为商品变更 MQ 音讯十分频繁,MQ 音讯每天的音讯量更是达到了日常的数倍,而且好多商品还变更了三级类目 ID;
  2. 零碎在更新这些三级类目 ID 发生变化的 SKU 商品信息时,依据批改后的三级类目 ID 路由后的分片更新商品信息时产生了谬误,并且重试了 5 次,仍然没有胜利;
  3. 因为在新路由的分片上没有这个商品的索引信息,这些更新申请永远也不会执行胜利,零碎的日志文件中也记录了大量的异样重试日志。
  4. 商品变更 MQ 音讯也开始呈现了积压报警,MQ 音讯的生产速度显著赶不上生产速度。
  5. 察看 MQ 音讯消费者的 UMP 监控数据,发现生产性能很安稳,没有显著稳定,然而调用次数会在零碎生产 MQ 一段时间后呈现断崖式降落,由原来的每分钟几万调用量逐步降落到个位数。
  6. 在重启利用后,零碎又开始生产,UMP 监控调用次数复原到失常程度,然而零碎运行一段时间后,还是会呈现生产暂停问题,好像所有生产线程都被暂停了一样。

三、排查问题的具体过程

首先找一台暂停生产 MQ 音讯的容器,查看利用过程 ID,应用 jstack 命令 dump 利用过程的整个线程堆栈信息,将导出的线程堆栈信息打包上传到 https://fastthread.io/ 进行线程状态剖析。剖析报告如下:

通过剖析报告发现有 124 个处于 BLOCKED 状态的线程,而后能够点击查看各线程的具体堆栈信息,堆栈信息如下:

间断查看多个线程的具体堆栈信息,MQ 生产线程都是在 waiting to lock <0x00000005eb781b10> (a org.elasticsearch.action.bulk.BulkProcessor),而后依据 0x00000005eb781b10 去搜寻发现,这个对象锁正在被另外一个线程占用,占用线程堆栈信息如下:

这个线程状态此时正处于 WAITING 状态,通过线程名称发现,该线程应该是 ES 客户端外部线程。正是该线程抢占了业务线程的锁,而后又在期待其余条件触发该线程执行,所以导致了所有的 MQ 生产业务线程始终无奈获取 BulkProcessor 外部的锁,导致呈现了生产暂停问题。

然而这个线程 elasticsearchscheduler 为啥不能执行?它是什么时候启动的?又有什么作用?

就须要咱们对 BulkProcessor 进行深入分析, 因为 BulkProcessor 是通过 builder 模块进行创立的,所以深刻 builder 源码,理解一下 BulkProcessor 的创立过程。

public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {Objects.requireNonNull(consumer, "consumer");
        Objects.requireNonNull(listener, "listener");
        final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
        return new Builder(consumer, listener,
                (delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS),
                () -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
    }

外部创立了一个工夫调度执行线程池,线程命名规定和上述持有锁的线程名称类似,具体代码如下:

static ScheduledThreadPoolExecutor initScheduler(Settings settings) {
        ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
                EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
        scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        scheduler.setRemoveOnCancelPolicy(true);
        return scheduler;
    }

最初在 build 办法外部执行了 BulkProcessor 的外部有参构造方法,在构造方法外部启动了一个周期性执行的 flushing 工作,代码如下

 BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
                  int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
                  Scheduler scheduler, Runnable onClose) {
        this.bulkActions = bulkActions;
        this.bulkSize = bulkSize.getBytes();
        this.bulkRequest = new BulkRequest();
        this.scheduler = scheduler;
        this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests);
        // Start period flushing task after everything is setup
        this.cancellableFlushTask = startFlushTask(flushInterval, scheduler);
        this.onClose = onClose;
    }
private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) {if (flushInterval == null) {return new Scheduler.Cancellable() {
                @Override
                public void cancel() {}

                @Override
                public boolean isCancelled() {return true;}
            };
        }
        final Runnable flushRunnable = scheduler.preserveContext(new Flush());
        return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
    }
class Flush implements Runnable {

        @Override
        public void run() {synchronized (BulkProcessor.this) {if (closed) {return;}
                if (bulkRequest.numberOfActions() == 0) {return;}
                execute();}
        }
    }

通过源代码发现,该 flush 工作就是在创立 BulkProcessor 对象时设置的固定工夫 flush 逻辑,当 setFlushInterval 办法参数失效,就会启动一个后盾定时 flush 工作。flush 距离,由 setFlushInterval 办法参数定义。该 flush 工作在运行期间,也会抢占 BulkProcessor 对象锁,抢到锁后,才会执行 execute 办法。具体的办法调用关系源代码如下:

/**
     * Adds the data from the bytes to be processed by the bulk processor
     */
    public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
                                          @Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception {bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true, xContentType);
        executeIfNeeded();
        return this;
    }

    private void executeIfNeeded() {ensureOpen();
        if (!isOverTheLimit()) {return;}
        execute();}

    // (currently) needs to be executed under a lock
    private void execute() {
        final BulkRequest bulkRequest = this.bulkRequest;
        final long executionId = executionIdGen.incrementAndGet();

        this.bulkRequest = new BulkRequest();
        this.bulkRequestHandler.execute(bulkRequest, executionId);
    }

而上述代码中的 add 办法,则是由 MQ 生产业务线程去调用,在该办法上同样有一个 synchronized 关键字,所以生产 MQ 业务线程会和 flush 工作执行线程间接会存在锁竞争关系。具体 MQ 生产业务线程调用伪代码如下:

 @Override
 public void upsertCommonSku(CommonSkuEntity commonSkuEntity) {String source = JsonUtil.toString(commonSkuEntity);
            UpdateRequest updateRequest = new UpdateRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());
            updateRequest.doc(source, XContentType.JSON);
            IndexRequest indexRequest = new IndexRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());
            indexRequest.source(source, XContentType.JSON);
            updateRequest.upsert(indexRequest);
            updateRequest.routing(commonSkuEntity.getCat3().toString());
            fullbulkProcessor.add(updateRequest);
}  

通过以上对线程堆栈剖析,发现所有的业务线程都在期待 elasticsearchscheduler 线程开释 BulkProcessor 对象锁,然而该线程确始终没有开释该对象锁,从而呈现了业务线程的死锁问题。

联合利用日志文件中呈现的大量异样重试日志,可能与 BulkProcessor 的异样重试策略无关,而后进一步理解 BulkProcessor 的异样重试代码逻辑。因为业务线程中提交 BulkRequest 申请都对立提交到了 BulkRequestHandler 对象中的 execute 办法外部进行解决,代码如下:

public final class BulkRequestHandler {
    private final Logger logger;
    private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
    private final BulkProcessor.Listener listener;
    private final Semaphore semaphore;
    private final Retry retry;
    private final int concurrentRequests;

    BulkRequestHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy,
                       BulkProcessor.Listener listener, Scheduler scheduler, int concurrentRequests) {
        assert concurrentRequests >= 0;
        this.logger = Loggers.getLogger(getClass());
        this.consumer = consumer;
        this.listener = listener;
        this.concurrentRequests = concurrentRequests;
        this.retry = new Retry(backoffPolicy, scheduler);
        this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
    }

    public void execute(BulkRequest bulkRequest, long executionId) {Runnable toRelease = () -> {};
        boolean bulkRequestSetupSuccessful = false;
        try {listener.beforeBulk(executionId, bulkRequest);
            semaphore.acquire();
            toRelease = semaphore::release;
            CountDownLatch latch = new CountDownLatch(1);
            retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse response) {
                    try {listener.afterBulk(executionId, bulkRequest, response);
                    } finally {semaphore.release();
                        latch.countDown();}
                }

                @Override
                public void onFailure(Exception e) {
                    try {listener.afterBulk(executionId, bulkRequest, e);
                    } finally {semaphore.release();
                        latch.countDown();}
                }
            });
            bulkRequestSetupSuccessful = true;
            if (concurrentRequests == 0) {latch.await();
            }
        } catch (InterruptedException e) {Thread.currentThread().interrupt();
            logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
            listener.afterBulk(executionId, bulkRequest, e);
        } catch (Exception e) {logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
            listener.afterBulk(executionId, bulkRequest, e);
        } finally {if (bulkRequestSetupSuccessful == false) {// if we fail on client.bulk() release the semaphore
                toRelease.run();}
        }
    }

    boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {semaphore.release(this.concurrentRequests);
            return true;
        }
        return false;
    }
}

BulkRequestHandler 通过构造方法初始化了一个 Retry 工作对象,该对象中也传入了一个 Scheduler,且该对象和 flush 工作中传入的是同一个线程池,该线程池外部只保护了一个固定线程。而 execute 办法首先会先依据 Semaphore 来管制并发执行数量,该并发数量在构建 BulkProcessor 时通过参数指定,通过上述配置发现该值配置为 1。所以每次只容许一个线程执行该办法。即 MQ 生产业务线程和 flush 工作线程,同一时间只能有一个线程能够执行。而后上面在理解一下重试工作是如何执行的,具体看如下代码:

 public void withBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest,
                            ActionListener<BulkResponse> listener) {RetryHandler r = new RetryHandler(backoffPolicy, consumer, listener, scheduler);
        r.execute(bulkRequest);
    }

RetryHandler 外部会执行提交 bulkRequest 申请,同时也会监听 bulkRequest 执行异样状态,而后执行工作重试逻辑,重试代码如下:

private void retry(BulkRequest bulkRequestForRetry) {assert backoff.hasNext();
            TimeValue next = backoff.next();
            logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
            Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry));
            scheduledRequestFuture = scheduler.schedule(next, ThreadPool.Names.SAME, command);
        }

RetryHandler 将执行失败的 bulk 申请从新交给了外部 scheduler 线程池去执行,通过以上代码理解,该线程池外部只保护了一个固定线程,同时该线程池可能还会被另一个 flush 工作去占用执行。所以如果重试逻辑正在执行的时候,此时线程池内的惟一线程正在执行 flush 工作,则会阻塞重试逻辑执行,重试逻辑不能执行实现,则不会开释 Semaphore,然而因为并发数量配置的是 1,所以 flush 工作线程须要期待其余线程开释一个 Semaphore 许可后能力继续执行。所以此处造成了循环期待,导致 Semaphore 和 BulkProcessor 对象锁都无奈开释,从而使得所有的 MQ 生产业务线程都阻塞在获取 BulkProcessor 锁之前。

同时,在 GitHub 的 ES 客户端源码客户端上也能搜寻到相似问题,例如:https://github.com/elastic/elasticsearch/issues/47599,所以更加印证了之前的猜测,就是因为 bulk 的一直重试从而引发了 BulkProcessor 外部的死锁问题。

四、如何解决问题

既然前边曾经理解到了问题产生的起因,所以就有了如下几种解决方案:

1. 降级 ES 客户端版本到 7.6 正式版,后续版本通过将异样重试工作线程池和 flush 工作线程池进行了物理隔离,从而防止了线程池的竞争,然而须要思考版本兼容性。

2. 因为该死锁问题是由大量异样重试逻辑引起的,能够在不影响业务逻辑的状况勾销重试逻辑,该计划能够不须要降级客户端版本,然而须要评估业务影响,执行失败的申请能够通过其余其余形式进行业务重试。

如有疏漏不妥之处,欢送斧正!

作者:京东批发 曹志飞

起源:京东云开发者社区

正文完
 0