共计 6834 个字符,预计需要花费 18 分钟才能阅读完成。
将类似或反复申请在上游零碎中合并后发往上游零碎,能够大大降低上游零碎的负载,晋升零碎整体吞吐率。文章介绍了 hystrix collapser、ConcurrentHashMultiset、自实现 BatchCollapser 三种申请合并技术,并通过其具体实现比照各自实用的场景。
前言
工作中,咱们常见的申请模型都是”申请 - 应答”式,即一次申请中,服务给申请调配一个独立的线程,一块独立的内存空间,所有的操作都是独立的,包含资源和零碎运算。咱们也晓得,在申请中解决一次零碎 I/O 的耗费是十分大的,如果有十分多的申请都进行同一类 I/O 操作,那么是否能够将这些 I/O 操作都合并到一起,进行一次 I/O 操作,是否能够大大降低上游资源服务器的累赘呢?
最近我工作之余的大部分工夫都花在这个问题的探索上了,比照了几个现有类库,为了解决一个小问题把 hystrix javanica 的代码翻了一遍,也依据本人工作中遇到的业务需要实现了一个简略的合并类,播种还是挺大的。可能这个需要有点”偏门”,在网上搜寻后果并不多,也没有综合一点的材料,索性本人总结分享一下,心愿能帮到起初遇到这种问题的小伙伴。
Hystrix Collapser
hystrix
开源的申请合并类库(出名的)如同也只有 Netflix 公司开源的 Hystrix 了,hystrix 专一于放弃 WEB 服务器在高并发环境下的零碎稳固,咱们罕用它的熔断器(Circuit Breaker) 来实现服务的服务隔离和灾时降级,有了它,能够使整个零碎不至于被某一个接口的高并发洪流冲塌,即便接口挂了也能够将服务降级,返回一个人性化的响应。申请合并作为一个保障上游服务稳固的利器,在 hystrix 内实现也并不意外。
咱们在应用 hystrix 时,罕用它的 javanica 模块,以注解的形式编写 hystrix 代码,使代码更简洁而且对业务代码侵入更低。所以在我的项目中咱们个别至多须要援用 hystrix-core 和 hystrix-javanica 两个包。
另外,hystrix 的实现都是通过 AOP,咱们要还要在我的项目 xml 里显式配置 HystrixAspect 的 bean 来启用它。
<aop:aspectj-autoproxy/>
<bean id="hystrixAspect" class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect" />
collapser
hystrix collapser 是 hystrix 内的申请合并器,它有自定义 BatchMethod 和 注解两种实现形式,自定义 BatchMethod 网上有各种教程,实现起来很简单,须要手写大量代码,而注解形式只须要增加两行注解即可,但配置形式我在官网文档上也没找见,中文方面本文应该是唯一份儿了。
其实现须要留神的是:
咱们在须要合并的办法上增加 @HystrixCollapser 注解,在定义好的合并办法上增加 @HystrixCommand 注解;single 办法只能传入一个参数,多参数状况下须要本人包装一个参数类,而 batch 办法须要 java.util.List<SingleParam>;single 办法返回 java.util.concurrent.Future<SingleReturn>,batch 办法返回 java.util.List<SingleReturn>,且要保障返回的后果数量和传入的参数数量统一。
上面是一个简略的示例:
public class HystrixCollapserSample {@HystrixCollapser(batchMethod = "batch")
public Future<Boolean> single(String input) {return null; // single 办法不会被执行到}
public List<Boolean> batch(List<String> inputs) {return inputs.stream().map(it -> Boolean.TRUE).collect(Collectors.toList());
}
}
源码实现
为了解决 hystrix collapser
的配置问题看了下 hystrix javanica 的源码,这里简略总结一下 hystrix 申请合并器的具体实现,源码的具体解析在我的笔记: Hystrix collasper
源码解析。
在 spring-boot 内注册切面类的 bean,外面蕴含 @HystrixCollapser 注解切面;
在办法执行时检测到办法被 HystrixCollapser 注解后,spring 调用 methodsAnnotatedWithHystrixCommand
办法来执行 hystrix 代理;
hystrix 获取一个 collapser 实例(在以后 scope 内检测不到即创立);
hystrix 将以后申请的参数提交给 collapser,由 collapser 存储在一个 concurrentHashMap RequestArgumentType -> CollapsedRequest)
内,此办法会创立一个 Observable 对象,并返回一个 察看此对象的 Future 给业务线程;
collpser 在创立时会创立一个 timer 线程,定时生产存储的申请,timer 会将多个申请结构成一个合并后的申请,调用 batch 执行后将后果程序映射到输入参数,并告诉 Future 工作已实现。
须要留神,因为须要期待 timer 执行真正的申请操作,collapser 会导致所有的申请的 cost 都会减少约 timerInterval/2 ms;
配置
hystrix collapser
的配置须要在 @HystrixCollapser
注解上应用,次要包含两个局部,专有配置和 hystrixCommand 通用配置;
专有配置包含:
collapserKey
,这个能够不必配置,hystrix 会默认应用以后办法名;
`batchMethod`,配置 batch 办法名,咱们个别会将 single 办法和 batch 办法定义在同一个类内,间接填办法名即可;`scope `,最坑的配置项,也是逼我读源码的首恶,` com.netflix.hystrix.HystrixCollapser.Scope` 枚举类,有 REQUEST, GLOBAL 两种选项,在 scope 为 REQUEST 时,hystrix 会为每个申请都创立一个 collapser,此时你会发现 batch 办法执行时,传入的申请数总为 1。而且 REQUEST 项还是默认项,不明确这样申请合并还有什么意义;`collapserProperties` , 在此选项内咱们能够配置 ` hystrixCommand` 的通用配置;
通用配置包含:
`maxRequestsInBatch `, 结构批量申请时,应用的单个申请的最大数量;`timerDelayInMilliseconds` , 此选项配置 collapser 的 timer 线程多久会合并一次申请;
requestCache.enabled
, 配置提交申请时是否缓存;
一个残缺的配置如下:
@HystrixCollapser(
batchMethod = "batch",
collapserKey = "single",
scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
collapserProperties = {@HystrixProperty(name = "maxRequestsInBatch", value = "100"),
@HystrixProperty(name = "timerDelayInMilliseconds", value = "1000"),
@HystrixProperty(name = "requestCache.enabled", value = "true")
})
BatchCollapser
设计
因为业务需要,咱们并不太关怀被合并申请的返回值,而且感觉 hystrix 放弃那么多的 Future 并没有必要,于是本人实现了一个简略的申请合并器,业务线程简略地将申请放到一个容器里,申请数累积到一定量或提早了肯定的工夫,就取出容器内的数据对立发送给上游零碎。
设计思维跟 hystrix 相似,合并器有一个字段作为存储申请的容器,且设置一个 timer 线程定时生产容器内的申请,业务线程将申请参数提交到合并 器的容器内。不同之处在于,业务线程将申请提交给容器后立刻同步返回胜利,不用管申请的生产后果,这样便实现了工夫维度上的合并触发。
另外,我还增加了另外一个维度的触发条件,每次将申请参数增加到容器后都会测验一下容器内申请的数量,如果数量达到肯定的阈值,将在业务线程内合并执行一次。
因为有两个维度会触发合并,就不可避免会遇到线程平安问题。为了保障容器内的申请不会被多个线程反复生产或都漏掉,我须要一个容器能满足以下条件:
是一种 Collection,相似于 ArrayList 或 Queue,能够存反复元素且有程序;
在多线程环境中能平安地将外面的数据全取出来进行生产,而不必本人实现锁。
java.util.concurrent
包内的 LinkedBlockingDeque 刚好符合要求,首先它实现了 BlockingDeque 接口,多线程环境下的存取操作是平安的;此外,它还提供 drainTo(Collection<? super E> c, int maxElements)
办法,能够将容器内 maxElements 个元素平安地取出来,放到 Collection c 中。
实现
以下是具体的代码实现:
public class BatchCollapser<E> implements InitializingBean {private static final Logger logger = LoggerFactory.getLogger(BatchCollapser.class);
private static volatile Map<Class, BatchCollapser> instance = Maps.newConcurrentMap();
private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1);
private volatile LinkedBlockingDeque<E> batchContainer = new LinkedBlockingDeque<>();
private Handler<List<E>, Boolean> cleaner;
private long interval;
private int threshHold;
private BatchCollapser(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) {
this.cleaner = cleaner;
this.threshHold = threshHold;
this.interval = interval;
}
@Override
public void afterPropertiesSet() throws Exception {SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
try {this.clean();
} catch (Exception e) {logger.error("clean container exception", e);
}
}, 0, interval, TimeUnit.MILLISECONDS);
}
public void submit(E event) {batchContainer.add(event);
if (batchContainer.size() >= threshHold) {clean();
}
}
private void clean() {List<E> transferList = Lists.newArrayListWithExpectedSize(threshHold);
batchContainer.drainTo(transferList, 100);
if (CollectionUtils.isEmpty(transferList)) {return;}
try {cleaner.handle(transferList);
} catch (Exception e) {logger.error("batch execute error, transferList:{}", transferList, e);
}
}
public static <E> BatchCollapser getInstance(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) {Class jobClass = cleaner.getClass();
if (instance.get(jobClass) == null) {synchronized (BatchCollapser.class) {if (instance.get(jobClass) == null) {instance.put(jobClass, new BatchCollapser<>(cleaner, threshHold, interval));
}
}
}
return instance.get(jobClass);
}
}
以下代码内须要留神的点:
因为合并器的全局性需要,须要将合并器实现为一个单例,另外为了晋升它的通用性,外部应用应用 concurrentHashMap 和 double check 实现了一个简略的单例工厂。为了辨别不同用处的合并器,工厂须要传入一个实现了 Handler 的实例,通过实例的 class 来对申请进行分组存储。因为 ` java.util.Timer ` 的阻塞个性,一个 Timer 线程在阻塞时不会启动另一个同样的 Timer 线程,所以应用 `ScheduledExecutorService` 定时启动 Timer 线程。
ConcurrentHashMultiset
设计
下面介绍的申请合并都是将多个申请一次发送,上游服务器解决时实质上还是多个申请,最好的申请合并是在内存中进行,将申请后果简略合并成一个发送给上游服务器。如咱们常常会遇到的需要:元素分值累加或数据统计,就能够先在内存中将某一项的分值或数据累加起来,定时申请数据库保留。
Guava 内就提供了这么一种数据结构: ConcurrentHashMultiset
,它不同于一般的 set 构造存储雷同元素时间接笼罩原有元素,而是给每个元素放弃一个计数 count, 插入反复时元素的 count 值加 1。而且它在增加和删除时并不加锁也能保障线程平安,具体实现是通过一个 while(true) 循环尝试操作,直到操作够所须要的数量。
ConcurrentHashMultiset 这种排重计数的个性,非常适合数据统计这种元素在短时间内反复率很高的场景,通过排重后的数量计算,能够大大降低上游服务器的压力,即便反复率不高,能用大量的内存空间换取零碎可用性的进步,也是很划算的。
实现
应用 ConcurrentHashMultiset
进行申请合并与应用一般容器在整体构造上并无太大差别,具体相似于:
if (ConcurrentHashMultiset.isEmpty()) {return;}
List<Request> transferList = Lists.newArrayList();
ConcurrentHashMultiset.elementSet().forEach(request -> {int count = ConcurrentHashMultiset.count(request);
if (count <= 0) {return;}
transferList.add(count == 1 ? request : new Request(request.getIncrement() * count));
ConcurrentHashMultiset.remove(request, count);
});
小结
最初总结一下各个技术实用的场景:
hystrix collapser
: 须要每个申请的后果,并且不在意每个申请的 cost 会减少;
BatchCollapser
: 不在意申请的后果,须要申请合并能在工夫和数量两个维度上触发;
ConcurrentHashMultiset
:申请反复率很高的统计类场景;
另外,如果抉择本人来实现的话,齐全能够将 BatchCollapser
和 ConcurrentHashMultiset
联合一下,在 BatchCollapser 里应用 ConcurrentHashMultiset
作为容器,这样就能够联合两者的劣势了