聊聊Elasticsearch的CachedSupplier

6次阅读

共计 4292 个字符,预计需要花费 11 分钟才能阅读完成。

本文主要研究一下 Elasticsearch 的 CachedSupplier

CachedSupplier

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/CachedSupplier.java

public final class CachedSupplier<T> implements Supplier<T> {

    private Supplier<T> supplier;
    private T result;
    private boolean resultSet;

    public CachedSupplier(Supplier<T> supplier) {this.supplier = supplier;}

    @Override
    public synchronized T get() {if (resultSet == false) {result = supplier.get();
            resultSet = true;
        }
        return result;
    }

}
  • CachedSupplier 实现了 Supplier 接口,它包装了一个 supplier,其 get 方法只调用一次原始 supplier 的 get 方法并缓存其结果,下次调用直接返回缓存的结果

实例

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/search/query/TopDocsCollectorContext.java

    abstract static class SimpleTopDocsCollectorContext extends TopDocsCollectorContext {

        private static TopDocsCollector<?> createCollector(@Nullable SortAndFormats sortAndFormats, int numHits,
                @Nullable ScoreDoc searchAfter, int hitCountThreshold) {if (sortAndFormats == null) {return TopScoreDocCollector.create(numHits, searchAfter, hitCountThreshold);
            } else {return TopFieldCollector.create(sortAndFormats.sort, numHits, (FieldDoc) searchAfter, hitCountThreshold);
            }
        }

        protected final @Nullable SortAndFormats sortAndFormats;
        private final Collector collector;
        private final Supplier<TotalHits> totalHitsSupplier;
        private final Supplier<TopDocs> topDocsSupplier;
        private final Supplier<Float> maxScoreSupplier;

        /**
         * Ctr
         * @param reader The index reader
         * @param query The Lucene query
         * @param sortAndFormats The query sort
         * @param numHits The number of top hits to retrieve
         * @param searchAfter The doc this request should "search after"
         * @param trackMaxScore True if max score should be tracked
         * @param trackTotalHitsUpTo True if the total number of hits should be tracked
         * @param hasFilterCollector True if the collector chain contains at least one collector that can filters document
         */
        private SimpleTopDocsCollectorContext(IndexReader reader,
                                              Query query,
                                              @Nullable SortAndFormats sortAndFormats,
                                              @Nullable ScoreDoc searchAfter,
                                              int numHits,
                                              boolean trackMaxScore,
                                              int trackTotalHitsUpTo,
                                              boolean hasFilterCollector) throws IOException {super(REASON_SEARCH_TOP_HITS, numHits);
            this.sortAndFormats = sortAndFormats;

            final TopDocsCollector<?> topDocsCollector;
            if (trackTotalHitsUpTo == SearchContext.TRACK_TOTAL_HITS_DISABLED) {
                // don't compute hit counts via the collector
                topDocsCollector = createCollector(sortAndFormats, numHits, searchAfter, 1);
                topDocsSupplier = new CachedSupplier<>(topDocsCollector::topDocs);
                totalHitsSupplier = () -> new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
            } else {
                // implicit total hit counts are valid only when there is no filter collector in the chain
                final int hitCount = hasFilterCollector ? -1 : shortcutTotalHitCount(reader, query);
                if (hitCount == -1) {topDocsCollector = createCollector(sortAndFormats, numHits, searchAfter, trackTotalHitsUpTo);
                    topDocsSupplier = new CachedSupplier<>(topDocsCollector::topDocs);
                    totalHitsSupplier = () -> topDocsSupplier.get().totalHits;
                } else {
                    // don't compute hit counts via the collector
                    topDocsCollector = createCollector(sortAndFormats, numHits, searchAfter, 1);
                    topDocsSupplier = new CachedSupplier<>(topDocsCollector::topDocs);
                    totalHitsSupplier = () -> new TotalHits(hitCount, TotalHits.Relation.EQUAL_TO);
                }
            }
            MaxScoreCollector maxScoreCollector = null;
            if (sortAndFormats == null) {maxScoreSupplier = () -> {TopDocs topDocs = topDocsSupplier.get();
                    if (topDocs.scoreDocs.length == 0) {return Float.NaN;} else {return topDocs.scoreDocs[0].score;
                    }
                };
            } else if (trackMaxScore) {maxScoreCollector = new MaxScoreCollector();
                maxScoreSupplier = maxScoreCollector::getMaxScore;
            } else {maxScoreSupplier = () -> Float.NaN;
            }
            this.collector = MultiCollector.wrap(topDocsCollector, maxScoreCollector);
        }

        @Override
        Collector create(Collector in) {
            assert in == null;
            return collector;
        }

        TopDocsAndMaxScore newTopDocs() {TopDocs in = topDocsSupplier.get();
            float maxScore = maxScoreSupplier.get();
            final TopDocs newTopDocs;
            if (in instanceof TopFieldDocs) {TopFieldDocs fieldDocs = (TopFieldDocs) in;
                newTopDocs = new TopFieldDocs(totalHitsSupplier.get(), fieldDocs.scoreDocs, fieldDocs.fields);
            } else {newTopDocs = new TopDocs(totalHitsSupplier.get(), in.scoreDocs);
            }
            return new TopDocsAndMaxScore(newTopDocs, maxScore);
        }

        @Override
        void postProcess(QuerySearchResult result) throws IOException {final TopDocsAndMaxScore topDocs = newTopDocs();
            result.topDocs(topDocs, sortAndFormats == null ? null : sortAndFormats.formats);
        }
    }
  • SimpleTopDocsCollectorContext 的构造器使用 CachedSupplier 创建了 topDocsSupplier;之后 newTopDocs 方法会调用 topDocsSupplier.get() 来获取 TopDocs

小结

CachedSupplier 实现了 Supplier 接口,它包装了一个 supplier,其 get 方法只调用一次原始 supplier 的 get 方法并缓存其结果,下次调用直接返回缓存的结果

doc

  • CachedSupplier
正文完
 0