关于stream:还在stream中使用peek不要被这些陷阱绊住了

简介自从JDK中引入了stream之后,好像所有都变得很简略,依据stream提供的各种办法,如map,peek,flatmap等等,让咱们的编程变得更美妙。 事实上,我也常常在我的项目中看到有些小伙伴会常常应用peek来进行一些业务逻辑解决。 那么既然JDK文档中说peek办法次要是在调试的状况下应用,那么peek肯定存在着某些鲜为人知的毛病。一起来看看吧。 peek的定义和根本应用先来看看peek的定义: Stream<T> peek(Consumer<? super T> action);peek办法承受一个Consumer参数,返回一个Stream后果。 而Consumer是一个FunctionalInterface,它须要实现的办法是上面这个: void accept(T t);accept对传入的参数T进行解决,然而并不返回任何后果。 咱们先来看下peek的根本应用: public static void peekOne(){ Stream.of(1, 2, 3) .peek(e -> log.info(String.valueOf(e))) .toList(); }运行下面的代码,咱们能够失去: [main] INFO com.flydean.Main - 1[main] INFO com.flydean.Main - 2[main] INFO com.flydean.Main - 3逻辑很简略,就是打印出Stream中的元素而已。 peek的流式解决peek作为stream的一个办法,当然是流式解决的。接下来咱们用一个具体的例子来阐明流式解决具体是如何操作的。 public static void peekForEach(){ Stream.of(1, 2, 3) .peek(e -> log.info(String.valueOf(e))) .forEach(e->log.info("forEach"+e)); }这一次咱们把toList办法替换成了forEach,通过具体的打印日志来看看到底产生了什么。 [main] INFO com.flydean.Main - 1[main] INFO com.flydean.Main - forEach1[main] INFO com.flydean.Main - 2[main] INFO com.flydean.Main - forEach2[main] INFO com.flydean.Main - 3[main] INFO com.flydean.Main - forEach3通过日志,咱们能够看出,流式解决的流程是对应流中的每一个元素,别离经验了peek和forEach操作。而不是先把所有的元素都peek过后再进行forEach。 ...

March 21, 2023 · 2 min · jiezi

关于stream:简单聊聊Streamreduce用法解析

根本应用先举一个简略的例子: 算法题:Words题目形容每个句子由多个单词组成,句子中的每个单词的长度都可能不一样,咱们假如每个单词的长度Ni为该单词的分量,你须要做的就是给出整个句子的均匀分量V。 解答要求工夫限度:1000ms, 内存限度:100MB输出输出只有一行,蕴含一个字符串S(长度不会超过100),代表整个句子,句子中只蕴含大小写的英文字母,每个单词之间有一个空格。 输入输入句子S的均匀分量V(四舍五入保留两位小数)。 Who Love Solo输入样例3.67 这道题的意思是求一句话中每个单词的均匀长度,咱们求得总长度而后除以单词数量即可,刚好能用到reduce()这个办法。 public class Demo {public static void main(String[] args) {Scanner sc = new Scanner(System.in);String[] s = sc.nextLine().split(" ");double res = Arrays.stream(s).mapToDouble(a ->a.length()).reduce(0,(a,b)->a+b);System.out.println(String.format("%.2f",res/s.length));}} 在代码中,.reduce(0,(a,b)->a+b);这一块就是咱们经典的应用案例,咱们要先明确其中a,b的含意,而后再学习如何应用要害概念:初始值的定义(Identity),累加器(Accumulator),组合器(Combiner) Identity : 定义一个元素代表是归并操作的初始值,如果Stream 是空的,也是Stream 的默认后果Accumulator: 定义一个带两个参数的函数,第一个参数是上个归并函数的返回值,第二个是Strem 中下一个元素。Combiner: 调用一个函数来组合归并操作的后果,当归并是并行执行或者当累加器的函数和累加器的实现类型不匹配时才会调用此函数。也就是说0就是咱们的初始值,(a,b)->a+b就是咱们的累加器,其中a就是上一次的计算结果,b就是Stream流中以后元素,而前面的a+b则是计算规定,比方如果咱们改成a*b,那就是计算乘积了,当然咱们也能够用办法援用来代替 lambda 表达式。 double res = Arrays.stream(s).mapToDouble(a ->a.length()).reduce(0,Double::sum);1这就是最根本的应用了,不晓得小伙伴们有没有学会呢? 额定举例 当然,咱们能够用reduce 办法解决其余类型的 stream,例如,能够操作一个 String 类型的数组,把数组的字符串进行拼接。 List<String> letters = Arrays.asList("a", "b", "c", "d", "e");String result = letters.stream().reduce("", (partialString, element) -> partialString + element);assertThat(result).isEqualTo("abcde"); ...

January 3, 2023 · 1 min · jiezi

关于stream:Java18-Stream流的使用

流的应用无状态:解决单个数据有状态:解决所有数据两头操作(无状态)两头操作(有状态)终端操作(短路)终端操作(非短路)过滤(filter)去重(distinct)所有匹配(allMatch)遍历(forEach)映射(map)跳过(skip)任意匹配(anyMatch)归约(reduce)扁平化(flatMap)截断(limit)不匹配(noneMatch)最大值(max)遍历(peek)排序(sorted)查找首个(findFirst)最小值(min) 查找任意(findAny)聚合(collect) 计数(count)

November 20, 2022 · 1 min · jiezi

关于stream:Java-Stream

流操作分类两头操作无状态操作【对单个数据进行解决】filtermappeek有状态操作【对所有的数据进行解决】dictinctsorted limit终端操作非短路操作forEachcollectcount短路操作anyMatchfindFirstfindAll两头操作(无状态)两头操作(有状态)终端操作(短路)终端操作(非短路)过滤(filter)去重(distinct)所有匹配(allMatch)遍历(forEach)映射(map)跳过(skip)任意匹配(anyMatch)归约(reduce)扁平化(flatMap)截断(limit)不匹配(noneMatch)最大值(max)遍历(peek)排序(sorted)查找首个(findFirst)聚合(collect) 查找任意(findAny)最小值(min) 计数(count)

November 20, 2022 · 1 min · jiezi

关于stream:stream使用

一:stream应用范畴:对数组、Collection等汇合类中的元素进行操作(数组、List、Set、Map)二:流操作的步骤 1、创立stream API阐明stream创立出一个新的stream串行流对象parallelStream()创立出一个可并行执行的stream流对象Stream.of()通过给定的一系列元素创立一个新的Stream串行流对象2、stream两头解决 3、终止stream![image.png](/img/bVc2ZEh)三:流的应用https://www.cnblogs.com/softw...

October 13, 2022 · 1 min · jiezi

关于stream:JDK8-Stream性能测试看看到底有多快

送大家以下学习材料,文末有支付形式 JDK8 Stream 数据流效率剖析Stream 是Java SE 8类库中新增的要害形象,它被定义于 java.util.stream (这个包里有若干流类型:Stream<T> 代表对象援用流。 此外还有一系列特化流,如 IntStream,LongStream,DoubleStream等 ),Java 8 引入的的Stream次要用于取代局部Collection的操作,每个流代表一个值序列,流提供一系列罕用的汇集操作,能够便捷的在它下面进行各种运算。 汇合类库也提供了便捷的形式使咱们能够以操作流的形式应用汇合、数组以及其它数据结构; stream 的操作品种 ①两头操作 当数据源中的数据上了流水线后,这个过程对数据进行的所有操作都称为“两头操作”;两头操作依然会返回一个流对象,因而多个两头操作能够串连起来造成一个流水线;stream 提供了多种类型的两头操作,如 filter、distinct、map、sorted 等等;②终端操作 当所有的两头操作实现后,若要将数据从流水线上拿下来,则须要执行终端操作;stream 对于终端操作,能够间接提供一个两头操作的后果,或者将后果转换为特定的 collection、array、String 等;stream 的特点①只能遍历一次: 数据流的从一头获取数据源,在流水线上顺次对元素进行操作,当元素通过流水线,便无奈再对其进行操作,能够从新在数据源获取一个新的数据流进行操作; ②采纳外部迭代的形式: 对Collection进行解决,个别会应用 Iterator 遍历器的遍历形式,这是一种内部迭代; 而对于解决Stream,只有申明解决形式,处理过程由流对象自行实现,这是一种外部迭代,对于大量数据的迭代解决中,外部迭代比内部迭代要更加高效; stream 绝对于 Collection 的长处无存储:流并不存储值;流的元素源自数据源(可能是某个数据结构、生成函数或I/O通道等等),通过一系列计算步骤失去;函数式格调:对流的操作会产生一个后果,但流的数据源不会被批改;惰性求值:少数流操作(包含过滤、映射、排序以及去重)都能够以惰性形式实现。这使得咱们能够用一遍遍历实现整个流水线操作,并能够用短路操作提供更高效的实现;无需上界:不少问题都能够被表白为有限流(infinite stream):用户不停地读取流直到称心的后果呈现为止(比如说,枚举 完满数 这个操作能够被表白为在所有整数上进行过滤);汇合是无限的,但流能够表白为无线流;代码简练:对于一些collection的迭代解决操作,应用 stream 编写能够非常简洁,如果应用传统的 collection 迭代操作,代码可能非常啰嗦,可读性也会比拟蹩脚;stream 和 iterator 迭代的效率比拟好了,下面 stream 的长处吹了那么多,stream 函数式的写法是很难受,那么 steam 的效率到底怎么呢? 先说论断: - 传统 iterator (for-loop) 比 stream(JDK8) 迭代性能要高,尤其在小数据量的状况下; - 在多核情景下,对于大数据量的解决,parallel stream 能够有比 iterator 更高的迭代解决效率; 我别离对一个随机数列 List (数量从 10 到 10000000)进行映射、过滤、排序、规约统计、字符串转化场景下,对应用 stream 和 iterator 实现的运行效率进行了统计,。 ...

June 3, 2021 · 1 min · jiezi

关于stream:Java8-Stream-提高编码效率早点下班

编程中操作汇合数据是十分频繁的,应用Java8 中的Stream对汇合解决,联合Lambda函数式编程能极大的简化代码,正当的应用Stream能进步代码可读性,另一方面从Java8面世以来Stream API通过了有数我的项目的实际考验,其稳定性和性能自不必说,网上有很多相干的性能测试案例能够查阅参考,如果有人对你说:Lambda 可读性不好,保护老本低等一些问题,你大可释怀,请肯定看下最初的留神点 1. Stream 创立Stream的创立形式比拟多,接下来介绍几种罕用的形式,以下Lists应用的google guava的API,间接上代码: // 形式1:Stream.of以及其余的静态方法,测试常应用Stream<String> stream1 = Stream.of("A", "B");// 形式2:Collection形式,常见如(List、Set)Stream<String> stream2 = Lists.newArrayList("A", "B").stream();Stream<String> stream3 = Sets.newHashSet("A", "B").stream();// 形式3:数组形式Stream<String> stream4 = Arrays.stream(new String[]{"A", "B"});// 形式4:通过API接口创立,文件API等Stream<String> stream5 = Files.lines(Paths.get("/file.text"));// 形式5:创立根本数据类型对应的Stream,如:IntStream、LongStream、DoubleStreamIntStream stream6 = Arrays.stream(new int[] { 1, 2, 3 });// 形式6:通过Stream.builder创立Stream<Object> stream7 = Stream.builder().add("A").build();以上创立形式形式2、形式3比拟罕用,其中形式3也能够应用parallelStream创立并行流,其余的形式能够通过parallel办法转换为并行流,在数据量较大时进步数据处理效率,如下: // 间接应用parallelStream创立Stream<String> stream1 = Lists.newArrayList("A", "B").parallelStream();// 应用parallel转化一般流为并行流Stream<String> stream2 = Arrays.stream(new String[]{"A", "B"}).parallel();2. Stream 两头操作Stream.map将原数据处理后生成新的数据,其中mapToInt、mapToLong、mapToDouble办法可间接转换为IntStream、LongStream、DoubleStream(用的比拟少,大家可自行查找) // 原数据增加后缀-NList<String> result1 = Lists.newArrayList("A") .stream().map(item -> item + "-N").collect(Collectors.toList());// 原字符串转化为数组List<String[]> result2 = Lists.newArrayList("A") .stream().map(item -> new String[]{item}).collect(Collectors.toList());Stream.flatMap合并多个Stream为一个Stream,常常用在合并多个List数据 ...

May 16, 2021 · 2 min · jiezi

关于java8:java8-stream处理集合

list分组后统计 List<UserInfo> userList = Lists.newArrayList(); Map<String, Long> collect = userList.stream() .collect(Collectors.groupingBy(UserInfo::getId, Collectors.counting()));待更新 待更新

April 14, 2021 · 1 min · jiezi

Redis-Streams与Spark的完美结合

来源:Redislabs作者:Roshan Kumar翻译:Kevin  (公众号:中间件小哥) 最近,我有幸在 Spark +AI 峰会上发表了题目为“Redis + Structured Streaming:扩展您的持续应用的完美组合”的演讲。 我对这个主题的兴趣是由 Apache Spark 和 Redis 在过去几个月中引入的新功能引起的。根据我之前使用 Apache Spark 的经验,我很欣赏它在运行批处理时的优雅,并且它在 2.0 版本中引入 Structured Streaming 是在这个方向上的进一步发展。 与此同时,Redis 最近宣布了用于管理流数据的新数据结构,称为“Streams”。Redis Streams 提供了生产者和消费者之间的异步通信功能以及持久性、回顾性查询功能和类似于 Apache Kafka 的横向扩展选项。从本质上讲,Redis 通过Streams 提供了一个轻便、快速、易于管理的流数据库,使数据工程师们受益良多。 此外,开发 Spark-Redis 库是为了使 Redis 可以作为弹性分布式数据集(RDD)使用。因为现在有了 Structured Streaming 和 Redis Streams,我们决定扩展 Spark-Redis 库将 Redis Streams 集成为 Apache Spark Structured Streaming 的数据源。 在上个月的演讲中,我演示了如何在 Redis Streams 中收集用户活动数据并将其下载到 Apache Spark 进行实时数据分析。我开发了一个小型的适合移动设备的 Node.js 应用程序,在这个程序中人们可以点击投票给他们最喜欢的狗来进行有趣的比赛。 这是一场艰苦的战斗,有几个观众甚至是黑客很有创意地攻击了我的应用程序。他们使用“页面检查”选项更改了 HTML 按钮名称试图弄乱应用的显示。但最终他们失败了,因为 Redis Streams,Apache Spark,Spark-Redis 库和我的代码都足够的强大,可以有效地应对这些攻击。 ...

October 17, 2019 · 1 min · jiezi

想学Nodejsstream先有必要搞清楚

什么是stream定义流的英文stream,流(Stream)是一个抽象的数据接口,Node.js中很多对象都实现了流,流是EventEmitter对象的一个实例,总之它是会冒数据(以 Buffer 为单位),或者能够吸收数据的东西,它的本质就是让数据流动起来。可能看一张图会更直观: 注意:stream不是node.js独有的概念,而是一个操作系统最基本的操作方式,只不过node.js有API支持这种操作方式。linux命令的|就是stream。 为什么要学习stream视频播放例子小伙伴们肯定都在线看过电影,对比定义中的图-水桶管道流转图,source就是服务器端的视频,dest就是你自己的播放器(或者浏览器中的flash和h5 video)。大家想一下,看电影的方式就如同上面的图管道换水一样,一点点从服务端将视频流动到本地播放器,一边流动一边播放,最后流动完了也就播放完了。 说明:视频播放的这个例子,如果我们不使用管道和流动的方式,直接先从服务端加载完视频文件,然后再播放。会造成很多问题 因内存占有太多而导致系统卡顿或者崩溃因为我们的网速 内存 cpu运算速度都是有限的,而且还要有多个程序共享使用,一个视频文件加载完可能有几个g那么大。读取大文件data的例子有一个这样的需求,想要读取大文件data的例子 使用文件读取 const http = require('http');const fs = require('fs');const path = require('path');const server = http.createServer(function (req, res) { const fileName = path.resolve(__dirname, 'data.txt'); fs.readFile(fileName, function (err, data) { res.end(data); });});server.listen(8000);使用文件读取这段代码语法上并没有什么问题,但是如果data.txt文件非常大的话,到了几百M,在响应大量用户并发请求的时候,程序可能会消耗大量的内存,这样可能造成用户连接缓慢的问题。而且并发请求过大的话,服务器内存开销也会很大。这时候我们来看一下用stream实现。 const http = require('http');const fs = require('fs');const path = require('path');const server = http.createServer(function (req, res) { const fileName = path.resolve(__dirname, 'data.txt'); let stream = fs.createReadStream(fileName); // 这一行有改动 stream.pipe(res); // 这一行有改动});server.listen(8000);使用stream就可以不需要把文件全部读取了再返回,而是一边读取一边返回,数据通过管道流动给客户端,真的减轻了服务器的压力。 ...

August 19, 2019 · 2 min · jiezi

groovy使用stream语法递归筛选法求N以内的质数素数附冒泡排序和插入排序练习

本人最近读完一本书《质数的孤独》,里面讲到孪生质数,就想查一下孪生质数的分布情况。其中主要用到了计算质数(素数)的方法,搜了一下,排名前几的都是用for循环来做的,感觉略微麻烦了一些,在比较一些还是觉得用递归筛选法来解决这个问题。 新建List<Integer>,然后从第0位开始,如果后面的能被这个数整除,则从数组中移除改元素,以此类推,最后留下的就是质数(素数)。代码如下: static void get(List<Integer> list, int tt) { int num = list.get(tt); for (int i = tt + 1; i < list.size(); i++) { if (list.get(i) % num == 0) list.remove(i--); } if (list.size() > ++tt) get(list, tt); }然后再去做相邻元素差求得孪生质数(孪生素数),贴一下求10000以内孪生质数(孪生素数)全部的代码: List<Integer> list = new ArrayList<>(); for (int i = 2; i < 10000; i+=2) { list.add(i); } get(list, 0); for (int i = 0; i < list.size() - 1; i++) { Integer integer = list.get(i); Integer integer1 = list.get(i + 1); if (integer1 - integer == 2) outputData(TEST_ERROR_CODE, "孪生质数:", integer + TAB + TAB + integer1); }最后附上一份冒泡排序和插入排序的练习代码: ...

July 15, 2019 · 1 min · jiezi

java网格输出的类练习stream

在做测试的时候,经常需要把一些信息输出到控制台,但是格式上比较乱,想弄成一个类似SQL客户端的那个输出格式,在参考了一些资料后自己写了一个简单的控制台网格输出的类,分享代码供大家参考。 使用方法:暂时支持了map和list两种类型的数据展示,并没有提供header功能。 public static void main(String[] args) { List<String> ss0 = Arrays.asList("234", "432", "54"); List<String> ss3 = Arrays.asList("234", "432", "54", "54", "54"); List<String> ss1 = Arrays.asList("6546", "7675"); Map<String, String> sss = new HashMap<>(); sss.put(getNanoMark() + EMPTY, "fdsf"); sss.put(getNanoMark() + EMPTY, "fdsfdsaff"); sss.put(getNanoMark() + EMPTY, "fdsf"); sss.put(getNanoMark() + EMPTY, "fdsfafdsf"); sss.put(getNanoMark() + EMPTY, "fdsf"); sss.put(getMark() + EMPTY, "fdsf"); show(sss); List<List<String>> rows = Arrays.asList(ss1, ss3, ss0); show(rows); JSONObject json = new JSONObject(); json.put("3234", 32432); json.put("323dsa4", 32432); json.put("3fdsa234", 32432); json.put("323fdsf4", 32432); json.put("32d34", 32432); json.put("32fdsafdf34", 32432); show(json); }效果展示: ...

July 14, 2019 · 2 min · jiezi

简洁方便的集合处理Java-8-stream流

背景java 8已经发行好几年了,前段时间java 12也已经问世,但平时的工作中,很多项目的环境还停留在java1.7中。而且java8的很多新特性都是革命性的,比如各种集合的优化、lambda表达式等,所以我们还是要去了解java8的魅力。 今天我们来学习java8的Stream,并不需要理论基础,直接可以上手去用。 我接触stream的原因,是我要搞一个用户收入消费的数据分析。起初的统计筛选分组都是打算用sql语言直接从mysql里得到结果来展现的。但在操作中我们发现这样频繁地访问数据库,性能会受到很大的影响,分析速度会很慢。所以我们希望能通过访问一次数据库就拿到所有数据,然后放到内存中去进行数据分析统计过滤。 接着,我看了stream的API,发现这就是我想要的。 一、Stream理解在java中我们称Stream为『流』,我们经常会用流去对集合进行一些流水线的操作。stream就像工厂一样,只需要把集合、命令还有一些参数灌输到流水线中去,就可以加工成得出想要的结果。这样的流水线能大大简洁代码,减少操作。 二、Stream流程原集合 —> 流 —> 各种操作(过滤、分组、统计) —> 终端操作Stream流的操作流程一般都是这样的,先将集合转为流,然后经过各种操作,比如过滤、筛选、分组、计算。最后的终端操作,就是转化成我们想要的数据,这个数据的形式一般还是集合,有时也会按照需求输出count计数。下文会一一举例。 三、API功能举例首先,定义一个用户对象,包含姓名、年龄、性别和籍贯四个成员变量: import lombok.AllArgsConstructor;import lombok.Builder;import lombok.Data;import lombok.NoArgsConstructor;import lombok.extern.log4j.Log4j;@Data@NoArgsConstructor@AllArgsConstructor@Log4j@Builderpublic class User { //姓名 private String name; //年龄 private Integer age; //性别 private Integer sex; //所在省市 private String address;}这里用lombok简化了实体类的代码。 然后创建需要的集合数据,也就是源数据: //1.构建我们的listList<User> list= Arrays.asList( new User("钢铁侠",40,0,"华盛顿"), new User("蜘蛛侠",20,0,"华盛顿"), new User("赵丽颖",30,1,"湖北武汉市"), new User("詹姆斯",35,0,"洛杉矶"), new User("李世民",60,0,"山西省太原市"), new User("蔡徐坤",20,1,"陕西西安市"), new User("葫芦娃的爷爷",70,0,"山西省太原市"));3.1 过滤1)创建流 stream() / parallelStream()stream() : 串行流parallelStream(): 并行流2)filter 过滤(T-> boolean)比如要过滤年龄在40岁以上的用户,就可以这样写: ...

June 25, 2019 · 2 min · jiezi

springCloud学习5SpringCloudStream事件驱动

springcloud 总集:https://www.tapme.top/blog/detail/2019-02-28-11-33 代码见文章结尾 想想平常生活中做饭的场景,在用电饭锅做饭的同时,我们可以洗菜、切菜,等待电饭锅发出饭做好的提示我们回去拔下电饭锅电源(或者什么也不知让它处于保温状态),反正这个时候我们知道饭做好了,接下来可以炒菜了。从这里可以看出我们在日常生活中与世界的互动并不是同步的、线性的,不是简单的请求--响应模型。它是事件驱动的,我们不断的发送消息、接受消息、处理消息。 同样在软件世界中也不全是请求--响应模型,也会需要进行异步的消息通信。使用消息实现事件通信的概念被称为消息驱动架构(Event Driven Architecture,EDA),也被称为消息驱动架构(Message Driven Architecture,MDA)。使用这类架构可以构建高度解耦的系统,该系统能够对变化做出响应,且不需要与特定的库或者服务紧密耦合。 在 Spring Cloud 项目中可以使用Spirng Cloud Stream轻而易举的构建基于消息传递的解决方案。 为什么使用消息传递 要解答这个问题,让我们从一个例子开始,之前一直使用的两个服务:许可证服务和组织服务。每次对许可证服务进行请求,许可证服务都要通过 http 请求到组织服务上查询组织信息。显而易见这次额外的 http 请求会花费较长的时间。如果能够将缓存组织数据的读操作,将会大幅提高许可证服务的响应时间。但是缓存数据有如下 2 个要求: 缓存的数据需要在许可证服务的所有实例之间保存一致——这意味着不能将数据缓存到服务实例的内存中。在更新或者删除一个组织数据时,许可证服务缓存的数据需要失效——避免读取到过期数据,需要尽早让过时数据失效并删除。 要实现上面的要求,现在有两种办法。 使用同步请求--响应模型来实现。组织服务在组织数据变化时调用许可证服务的接口通知组织服务已经变化,或者直接操作许可证服务的缓存。使用事件驱动。组织服务发出一个异步消息。许可证服务收到该消息后清除对应的缓存。同步请求-响应方式 许可证服务在 redis 中缓存从组织服务中查询到的服务信息,当组织数据更新时,组织服务同步 http 请求通知许可证服务数据过期。这种方式有以下几个问题: 组织服务和许可证服务紧密耦合这种方式不够灵活,如果要为组织服务添加新的消费者,必须修改组织服务代码,以让其通知新的服务数据变动。使用消息传递方式 同样的许可证服务在 redis 中缓存从组织服务中查询到的服务信息,当组织数据更新时,组织服务将更新信息写入到队列中。许可证服务监听消息队列。使用消息传递有一下 4 个好处: 松耦合性:将服务间的依赖,变成了服务对队列的依赖,依赖关系变弱了。耐久性:即使服务消费者已经关闭了,也可以继续往里发送消息,等消费者开启后处理可伸缩性: 消息发送者不用等待消息消费者的响应,它们可以继续做各自的工作灵活性:消息发送者不用知道谁会消费这个消息,因此在有新的消息消费者时无需修改消息发送代码spring cloud 中使用消息传递 spring cloud 项目中可以通过 spring cloud stream 框架来轻松集成消息传递。该框架最大的特点是抽象了消息传递平台的细节,因此可以在支持的消息队列中随意切换(包括 Apache Kafka 和 RabbitMQ)。 <!-- more --> spring cloud stream 架构 spring cloud stream 中有 4 个组件涉及到消息发布和消息消费,分别为: ...

June 21, 2019 · 3 min · jiezi

乐字节Java8核心特性实战之Stream流

大家好,我是乐字节的小乐。说起流,我们会联想到手机、电脑组装流水线,物流仓库商品包装流水线等等,如果把手机 ,电脑,包裹看做最终结果的话,那么加工商品前的各种零部件就可以看做数据源,而中间一系列的加工作业操作,就可以看做流的处理。 一、概念Java Se中对于流的操作有输入输出IO流,而Java8中引入的Stream 属于Java API中的一个新成员,它允许你以声明性方式处理数据集合,Stream 使用一种类似 SQL 语句从数据库查询数据的直观方式来提供一种对 Java 集合运算和表达的高阶抽象。 注意这里的流操作可以看做是对集合数据的处理。 简单来说,流是一种数据渠道,用于操作数据源(集合、数组、文件等)所生产的元素序列。 源-流会使用一个提供数据的源,如集合、数组或输入|输出资源。 从有序集生成流时会保留原有的顺序。由列表生成的流,其元素顺序与列表一致 元素序列-就像集合一样,流也提供了一个接口,可以访问特定元素类型的一组有序值。数据处理操作-流的数据处理功能支持类似于数据库的操作(数据筛选、过滤、排序等操作)。流水线-多个流操作本身会返回一个流,多个操作就可以链接起来,成为数据处理的一道流水线。二、流 & 集合计算的时期 集合中数据都是计算完毕的数据,例如从数据库中查询用户记录 按用户id 查询 降序排列 然后通过list 接收用户记录,数据的计算已在放入集合前完成 流中数据按需计算,按照使用者的需要计算数据,例如通过搜索引擎进行搜索,搜索出来的条目并不是全部呈现出来的,而且先显示最符合的前 10 条或者前 20 条,只有在点击 “下一页” 的时候,才会再输出新的 10 条。流的计算也是这样,当用户需要对应数据时,Stream 才会对其进行计算处理。 外部迭代与内部迭代 把集合比作一个工厂的仓库的话,一开始工厂硬件比较落后,要对货物作什么修改,此时工人亲自走进仓库对货物进行处理,有时候还要将处理后的货物转运到另一个仓库中。此时对于开发者来说需要亲自去做迭代,一个个地找到需要的货物,并进行处理,这叫做外部迭代。 当工厂发展起来后,配备了流水线作业,工厂只要根据需求设计出相应的流水线,然后工人只要把货物放到流水线上,就可以等着接收成果了,而且流水线还可以根据要求直接把货物输送到相应的仓库。这就叫做内部迭代,流水线已经帮你把迭代给完成了,你只需要说要干什么就可以了(即设计出合理的流水线)。相当于 Java8 引入的Stream 对数据的处理实现了”自动化”操作。 三、流操作过程 整个流操作就是一条流水线,将元素放在流水线上一个个地进行处理。需要注意的是:很多流操作本身就会返回一个流,所以多个操作可以直接连接起来, 如下图这样,操作可以进行链式调用,并且并行流还可以实现数据流并行处理操作。 总的来说,流操作过程分为三个阶段: 创建借助数据源创建流对象 中间处理筛选、切片、映射、排序等中间操作 终止流匹配、汇总、分组等终止操作 四、流的创建对流操作首先要创建对应的流,流的创建集中形式如下: 4.1 集合创建流在 Java 8 中, 集合接口有两个方法来生成流: stream() − 为集合创建串行流。parallelStream() − 为集合创建并行流。示例代码如下: public static void main(String[] args) { /** * 定义集合l1 并为集合创建串行流 */ List<String> l1 = Arrays.asList("周星驰", "周杰伦", "周星星", "周润发"); // 返回串行流 l1.stream(); // 返回并行流 l1.parallelStream();}上述操作得到的流是通过原始数据转换过来的流,除了这种流创建的基本操作外,对于流的创建还有以下几种方式。 ...

June 8, 2019 · 4 min · jiezi

修炼内功Java8-Stream是怎么工作的

Java8中新增的Stream,相信使用过的同学都已经感受到了它的便利,允许你以声明性的方式处理集合,而不用去做繁琐的for-loop/while-loop,并且可以以极低的成本并行地处理集合数据 如果需要从菜单中筛选出卡路里在400以下的菜品,并按卡路里排序后,输出菜品名称 在java8之前,需要进行两次显示迭代,并且还需要借助中间结果存储 List<Menu> lowCaloricDishes = new LinkedList<>();// 按照热量值进行筛选for(Dish dish : dishes) { if (dish.getCalories() < 400) { lowCaloricDishes.add(dish); }}// 按照热量进行排序lowCaloricDishes.sort(new Comparator<Dish>() { @Override public int compare(Dish d1, Dish d2) { return d1.getCalories().compareTo(d2.getCalories); }})// 提取名称List<String> lowCaloricDishesName = new LinkedList<>();for(Dish dish : lowCaloricDishes) { lowCaloricDishesName.add(dish.getName());}如果使用Stream API,只需要 List<String> lowCaloricDishesName = dishes.parallelStream() // 开启并行处理 .filter(d -> d.getCalories() < 400) // 按照热量值进行筛选 .sorted(Comparator.comparing(Dish::getCalories)) // 按照热量进行排序 .map(Dish::getName) // 提取名称 .collect(Collectors.toList()); // 将结果存入List甚至,可以写出更复杂的功能 ...

May 10, 2019 · 6 min · jiezi

干货|Spring Cloud Bus 消息总线介绍

继上一篇 干货|Spring Cloud Stream 体系及原理介绍 之后,本期我们来了解下 Spring Cloud 体系中的另外一个组件 Spring Cloud Bus (建议先熟悉 Spring Cloud Stream,不然无法理解 Spring Cloud Bus 内部的代码)。Spring Cloud Bus 对自己的定位是 Spring Cloud 体系内的消息总线,使用 message broker 来连接分布式系统的所有节点。Bus 官方的 Reference 文档 比较简单,简单到连一张图都没有。这是最新版的 Spring Cloud Bus 代码结构(代码量比较少):Bus 实例演示在分析 Bus 的实现之前,我们先来看两个使用 Spring Cloud Bus 的简单例子。所有节点的配置新增Bus 的例子比较简单,因为 Bus 的 AutoConfiguration 层都有了默认的配置,只需要引入消息中间件对应的 Spring Cloud Stream 以及 Spring Cloud Bus 依赖即可,之后所有启动的应用都会使用同一个 Topic 进行消息的接收和发送。Bus 对应的 Demo 已经放到了 github 上: https://github.com/fangjian0423/rocketmq-binder-demo/tree/master/rocketmq-bus-demo 。 该 Demo 会模拟启动 5 个节点,只需要对其中任意的一个实例新增配置项,所有节点都会新增该配置项。访问任意节点提供的 Controller 提供的获取配置的地址(key为hangzhou):curl -X GET ‘http://localhost:10001/bus/env?key=hangzhou’所有节点返回的结果都是 unknown,因为所有节点的配置中没有 hangzhou 这个 key。Bus 内部提供了 EnvironmentBusEndpoint 这个 Endpoint 通过 message broker 用来新增/更新配置。访问任意节点该 Endpoint 对应的 url /actuator/bus-env?name=hangzhou&value=alibaba 进行配置项的新增(比如访问 node1 的url):curl -X POST ‘http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba’ -H ‘content-type: application/json’然后再次访问所有节点 /bus/env 获取配置:$ curl -X GET ‘http://localhost:10001/bus/env?key=hangzhou’unknown%~ ⌚$ curl -X GET ‘http://localhost:10002/bus/env?key=hangzhou’unknown%~ ⌚$ curl -X GET ‘http://localhost:10003/bus/env?key=hangzhou’unknown%~ ⌚$ curl -X GET ‘http://localhost:10004/bus/env?key=hangzhou’unknown%~ ⌚$ curl -X GET ‘http://localhost:10005/bus/env?key=hangzhou’unknown%~ ⌚$ curl -X POST ‘http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba’ -H ‘content-type: application/json’~ ⌚$ curl -X GET ‘http://localhost:10005/bus/env?key=hangzhou’alibaba%~ ⌚$ curl -X GET ‘http://localhost:10004/bus/env?key=hangzhou’alibaba%~ ⌚$ curl -X GET ‘http://localhost:10003/bus/env?key=hangzhou’alibaba%~ ⌚$ curl -X GET ‘http://localhost:10002/bus/env?key=hangzhou’alibaba%~ ⌚$ curl -X GET ‘http://localhost:10001/bus/env?key=hangzhou’alibaba%可以看到,所有节点都新增了一个 key 为 hangzhou 的配置,且对应的 value 是 alibaba。这个配置项是通过 Bus 提供的 EnvironmentBusEndpoint 完成的。这里引用 程序猿DD 画的一张图片,Spring Cloud Config 配合 Bus 完成所有节点配置的刷新来描述之前的实例(本文实例不是刷新,而是新增配置,但是流程是一样的):部分节点的配置修改比如在 node1 上指定 destination 为 rocketmq-bus-node2 (node2 配置了 spring.cloud.bus.id 为 rocketmq-bus-node2:10002,可以匹配上) 进行配置的修改:curl -X POST ‘http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu’ -H ‘content-type: application/json’访问 /bus/env 获取配置(由于在 node1 上发送消息,Bus 也会对发送方的节点 node1 进行配置修改):~ ⌚$ curl -X POST ‘http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu’ -H ‘content-type: application/json’~ ⌚$ curl -X GET ‘http://localhost:10005/bus/env?key=hangzhou’alibaba%~ ⌚$ curl -X GET ‘http://localhost:10004/bus/env?key=hangzhou’alibaba%~ ⌚$ curl -X GET ‘http://localhost:10003/bus/env?key=hangzhou’alibaba%~ ⌚$ curl -X GET ‘http://localhost:10002/bus/env?key=hangzhou’xihu%~ ⌚$ curl -X GET ‘http://localhost:10001/bus/env?key=hangzhou’xihu%可以看到,只有 node1 和 node2 修改了配置,其余的 3 个节点配置未改变。Bus 的实现Bus 概念介绍事件Bus 中定义了远程事件 RemoteApplicationEvent,该事件继承了 Spring 的事件 ApplicationEvent,而且它目前有 4 个具体的实现:EnvironmentChangeRemoteApplicationEvent: 远程环境变更事件。主要用于接收一个 Map<String, String> 类型的数据并更新到 Spring 上下文中 Environment 中的事件。文中的实例就是使用这个事件并配合 EnvironmentBusEndpoint 和 EnvironmentChangeListener 完成的。AckRemoteApplicationEvent: 远程确认事件。Bus 内部成功接收到远程事件后会发送回 AckRemoteApplicationEvent 确认事件进行确认。RefreshRemoteApplicationEvent: 远程配置刷新事件。配合 @RefreshScope 以及所有的 @ConfigurationProperties 注解修饰的配置类的动态刷新。UnknownRemoteApplicationEvent:远程未知事件。Bus 内部消息体进行转换远程事件的时候如果发生异常会统一包装成该事件。Bus 内部还存在一个非 RemoteApplicationEvent 事件 - SentApplicationEvent 消息发送事件,配合 Trace 进行远程消息发送的记录。这些事件会配合 ApplicationListener 进行操作,比如 EnvironmentChangeRemoteApplicationEvent 配了 EnvironmentChangeListener 进行配置的新增/修改:public class EnvironmentChangeListener implements ApplicationListener<EnvironmentChangeRemoteApplicationEvent> { private static Log log = LogFactory.getLog(EnvironmentChangeListener.class); @Autowired private EnvironmentManager env; @Override public void onApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) { Map<String, String> values = event.getValues(); log.info(“Received remote environment change request. Keys/values to update " + values); for (Map.Entry<String, String> entry : values.entrySet()) { env.setProperty(entry.getKey(), entry.getValue()); } }}收到其它节点发送来的 EnvironmentChangeRemoteApplicationEvent 事件之后调用 EnvironmentManager#setProperty 进行配置的设置,该方法内部针对每一个配置项都会发送一个 EnvironmentChangeEvent 事件,然后被 ConfigurationPropertiesRebinder 所监听,进行 rebind 操作新增/更新配置。Actuator EndpointBus 内部暴露了 2 个 Endpoint,分别是 EnvironmentBusEndpoint 和 RefreshBusEndpoint,进行配置的新增/修改以及全局配置刷新。它们对应的 Endpoint id 即 url 是 bus-env 和 bus-refresh。配置Bus 对于消息的发送必定涉及到 Topic、Group 之类的信息,这些内容都被封装到了 BusProperties 中,其默认的配置前缀为 spring.cloud.bus,比如:spring.cloud.bus.refresh.enabled 用于开启/关闭全局刷新的 Listener。spring.cloud.bus.env.enabled 用于开启/关闭配置新增/修改的 Endpoint。spring.cloud.bus.ack.enabled 用于开启开启/关闭 AckRemoteApplicationEvent 事件的发送。spring.cloud.bus.trace.enabled 用于开启/关闭消息记录 Trace 的 Listener。消息发送涉及到的 Topic 默认用的是 springCloudBus,可以配置进行修改,Group 可以设置成广播模式或使用 UUID 配合 offset 为 lastest 的模式。每个 Bus 应用都有一个对应的 Bus id,官方取值方式较复杂:${vcap.application.name:${spring.application.name:application}}:${vcap.application.instance_index:${spring.application.index:${local.server.port:${server.port:0}}}}:${vcap.application.instance_id:${random.value}}建议手动配置 Bus id,因为 Bus 远程事件中的 destination 会根据 Bus id 进行匹配:spring.cloud.bus.id=${spring.application.name}-${server.port}Bus 底层分析Bus 的底层分析无非牵扯到这几个方面:消息是如何发送的;消息是如何接收的;destination 是如何匹配的;远程事件收到后如何触发下一个 action;BusAutoConfiguration 自动化配置类被 @EnableBinding(SpringCloudBusClient.class) 所修饰。@EnableBinding 的用法在上期文章 干货|Spring Cloud Stream 体系及原理介绍 中已经说明,且它的 value 为 SpringCloudBusClient.class,会在 SpringCloudBusClient 中基于代理创建出 input 和 output 的 DirectChannel:public interface SpringCloudBusClient { String INPUT = “springCloudBusInput”; String OUTPUT = “springCloudBusOutput”; @Output(SpringCloudBusClient.OUTPUT) MessageChannel springCloudBusOutput(); @Input(SpringCloudBusClient.INPUT) SubscribableChannel springCloudBusInput();}springCloudBusInput 和 springCloudBusOutput 这两个 Binding 的属性可以通过配置文件进行修改(比如修改 topic):spring.cloud.stream.bindings: springCloudBusInput: destination: my-bus-topic springCloudBusOutput: destination: my-bus-topic消息的接收的发送:// BusAutoConfiguration@EventListener(classes = RemoteApplicationEvent.class) // 1public void acceptLocal(RemoteApplicationEvent event) { if (this.serviceMatcher.isFromSelf(event) && !(event instanceof AckRemoteApplicationEvent)) { // 2 this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build()); // 3 }}@StreamListener(SpringCloudBusClient.INPUT) // 4public void acceptRemote(RemoteApplicationEvent event) { if (event instanceof AckRemoteApplicationEvent) { if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event) && this.applicationEventPublisher != null) { // 5 this.applicationEventPublisher.publishEvent(event); } // If it’s an ACK we are finished processing at this point return; } if (this.serviceMatcher.isForSelf(event) && this.applicationEventPublisher != null) { // 6 if (!this.serviceMatcher.isFromSelf(event)) { // 7 this.applicationEventPublisher.publishEvent(event); } if (this.bus.getAck().isEnabled()) { // 8 AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this, this.serviceMatcher.getServiceId(), this.bus.getAck().getDestinationService(), event.getDestinationService(), event.getId(), event.getClass()); this.cloudBusOutboundChannel .send(MessageBuilder.withPayload(ack).build()); this.applicationEventPublisher.publishEvent(ack); } } if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) { // 9 // We are set to register sent events so publish it for local consumption, // irrespective of the origin this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this, event.getOriginService(), event.getDestinationService(), event.getId(), event.getClass())); }}利用 Spring 事件的监听机制监听本地所有的 RemoteApplicationEvent 远程事件(比如 bus-env 会在本地发送 EnvironmentChangeRemoteApplicationEvent 事件,bus-refresh 会在本地发送 RefreshRemoteApplicationEvent 事件,这些事件在这里都会被监听到)。判断本地接收到的事件不是 AckRemoteApplicationEvent 远程确认事件(不然会死循环,一直接收消息,发送消息…)以及该事件是应用自身发送出去的(事件发送方是应用自身),如果都满足执行步骤 3。构造 Message 并将该远程事件作为 payload,然后使用 Spring Cloud Stream 构造的 Binding name 为 springCloudBusOutput 的 MessageChannel 将消息发送到 broker。@StreamListener 注解消费 Spring Cloud Stream 构造的 Binding name 为 springCloudBusInput 的 MessageChannel,接收的消息为远程消息。如果该远程事件是 AckRemoteApplicationEvent 远程确认事件并且应用开启了消息追踪 trace 开关,同时该远程事件不是应用自身发送的(事件发送方不是应用自身,表示事件是其它应用发送过来的),那么本地发送 AckRemoteApplicationEvent 远程确认事件表示应用确认收到了其它应用发送过来的远程事件,流程结束。如果该远程事件是其它应用发送给应用自身的(事件的接收方是应用自身),那么进行步骤 7 和 8,否则执行步骤 9。该远程事件不是应用自身发送(事件发送方不是应用自身)的话,将该事件以本地的方式发送出去。应用自身一开始已经在本地被对应的消息接收方处理了,无需再次发送。如果开启了 AckRemoteApplicationEvent 远程确认事件的开关,构造 AckRemoteApplicationEvent 事件并在远程和本地都发送该事件(本地发送是因为步骤 5 没有进行本地 AckRemoteApplicationEvent 事件的发送,也就是自身应用对自身应用确认; 远程发送是为了告诉其它应用,自身应用收到了消息)。如果开启了消息记录 Trace 的开关,本地构造并发送 SentApplicationEvent 事件bus-env 触发后所有节点的 EnvironmentChangeListener 监听到了配置的变化,控制台都会打印出以下信息:o.s.c.b.event.EnvironmentChangeListener : Received remote environment change request. Keys/values to update {hangzhou=alibaba}如果在本地监听远程确认事件 AckRemoteApplicationEvent,都会收到所有节点的信息,比如 node5 节点的控制台监听到的 AckRemoteApplicationEvent 事件如下:ServiceId [rocketmq-bus-node5:10005] listeners on {“type”:“AckRemoteApplicationEvent”,“timestamp”:1554124670484,“originService”:“rocketmq-bus-node5:10005”,“destinationService”:”",“id”:“375f0426-c24e-4904-bce1-5e09371fc9bc”,“ackId”:“750d033f-356a-4aad-8cf0-3481ace8698c”,“ackDestinationService”:"",“event”:“org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent”}ServiceId [rocketmq-bus-node5:10005] listeners on {“type”:“AckRemoteApplicationEvent”,“timestamp”:1554124670184,“originService”:“rocketmq-bus-node1:10001”,“destinationService”:"",“id”:“91f06cf1-4bd9-4dd8-9526-9299a35bb7cc”,“ackId”:“750d033f-356a-4aad-8cf0-3481ace8698c”,“ackDestinationService”:"",“event”:“org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent”}ServiceId [rocketmq-bus-node5:10005] listeners on {“type”:“AckRemoteApplicationEvent”,“timestamp”:1554124670402,“originService”:“rocketmq-bus-node2:10002”,“destinationService”:"",“id”:“7df3963c-7c3e-4549-9a22-a23fa90a6b85”,“ackId”:“750d033f-356a-4aad-8cf0-3481ace8698c”,“ackDestinationService”:"",“event”:“org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent”}ServiceId [rocketmq-bus-node5:10005] listeners on {“type”:“AckRemoteApplicationEvent”,“timestamp”:1554124670406,“originService”:“rocketmq-bus-node3:10003”,“destinationService”:"",“id”:“728b45ee-5e26-46c2-af1a-e8d1571e5d3a”,“ackId”:“750d033f-356a-4aad-8cf0-3481ace8698c”,“ackDestinationService”:"",“event”:“org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent”}ServiceId [rocketmq-bus-node5:10005] listeners on {“type”:“AckRemoteApplicationEvent”,“timestamp”:1554124670427,“originService”:“rocketmq-bus-node4:10004”,“destinationService”:"",“id”:“1812fd6d-6f98-4e5b-a38a-4b11aee08aeb”,“ackId”:“750d033f-356a-4aad-8cf0-3481ace8698c”,“ackDestinationService”:"",“event”:“org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent”}那么回到本章节开头提到的 4 个问题,我们分别做一下解答:消息是如何发送的: 在 BusAutoConfiguration#acceptLocal 方法中通过 Spring Cloud Stream 发送事件到 springCloudBus topic 中。消息是如何接收的: 在 BusAutoConfiguration#acceptRemote 方法中通过 Spring Cloud Stream 接收 springCloudBus topic 的消息。destination 是如何匹配的: 在 BusAutoConfiguration#acceptRemote 方法中接收远程事件方法里对 destination 进行匹配。远程事件收到后如何触发下一个 action: Bus 内部通过 Spring 的事件机制接收本地的 RemoteApplicationEvent 具体的实现事件再做下一步的动作(比如 EnvironmentChangeListener 接收了 EnvironmentChangeRemoteApplicationEvent 事件, RefreshListener 接收了 RefreshRemoteApplicationEvent 事件)。总结Spring Cloud Bus 自身内容还是比较少的,不过还是需要提前了解 Spring Cloud Stream 体系以及 Spring 自身的事件机制,在此基础上,才能更好地理解 Spring Cloud Bus 对本地事件和远程事件的处理逻辑。目前 Bus 内置的远程事件较少,大多数为配置相关的事件,我们可以继承 RemoteApplicationEvent 并配合 @RemoteApplicationEventScan 注解构建自身的微服务消息体系。本文作者:中间件小哥阅读原文本文为云栖社区原创内容,未经允许不得转载。 ...

April 15, 2019 · 4 min · jiezi

乐字节-Java8新特性之Stream流(下)

接上一篇:《Java8新特性之stream》,下面继续接着讲Stream5、流的中间操作常见的流的中间操作,归为以下三大类:筛选和切片流操作、元素映射操作、元素排序操作:5.1、筛选和切片例如以订单数据为例,在做报表展示时,会根据订单状态、用户信息、支付结果等状态来分别展示(即过滤和统计展示)定义订单Order类public class Order { // 订单id private Integer id; // 订单用户id private Integer userId; // 订单编号 private String orderNo; // 订单日期 private Date orderDate; // 收货地址 private String address; // 创建时间 private Date createDate; // 更新时间 private Date updateDate; // 订单状态 0-未支付 1-已支付 2-代发货 3-已发货 4-已接收 5-已完成 private Integer status; // 是否有效 1-有效订单 0-无效订单 private Integer isValid; //订单总金额 private Double total; /** 此处省略getter/setter方法 /}测试public static void main(String[] args) { Order order01 = new Order(1,10,“20190301”, new Date(),“上海市-浦东区”,new Date(),new Date(),4,1,100.0); Order order02 = new Order(2,30,“20190302”, new Date(),“北京市四惠区”,new Date(),new Date(),1,1,2000.0); Order order03 = new Order(3,20,“20190303”, new Date(),“北京市-朝阳区”,new Date(),new Date(),4,1,500.0); Order order04 = new Order(4,40,“20190304”, new Date(),“北京市-大兴区”,new Date(),new Date(),4,0,256.0); Order order05 = new Order(5,40,“20190304”, new Date(),“上海市-松江区”,new Date(),new Date(),4,0,1000.0); List<Order> ordersList= Arrays.asList(order01,order02,order03,order04); // 过滤订单集合 有效订单 并打印到控制台 ordersList.stream().filter((order)->order.getIsValid()==1).forEach(System.out::println); // 过滤订单集合有效订单 取前两条有效订单 并打印到控制台 ordersList.stream().filter((order)->order.getIsValid()==1).limit(2).forEach(System.out::println); } // 过滤订单集合有效订单 取最后一条记录 ordersList.stream().filter((order)->order.getIsValid()==1) .skip(ordersList.size()-2).forEach(System.out::println);// 去除订单编号重复的无效订单记录 此时因为比较的为Object Order对象需要重写HashCode 与Equals 方法/* * 重写 equals 方法 * @param obj * @return / @Override public boolean equals(Object obj) { boolean flag = false; if (obj == null) { return flag; } Order order = (Order) obj; if (this == order) { return true; } else { return (this.orderNo.equals(order.orderNo)); } } /* * 重写hashcode方法 * @return */ @Override public int hashCode() { int hashno = 7; hashno = 13 * hashno + (orderNo == null ? 0 : orderNo.hashCode()); return hashno; } // 过滤订单集合无效订单 去除订单号重复记录 ordersList.stream().filter((order)->order.getIsValid()==0).distinct().forEach(System.out::println);5.2、映射//过滤订单集合有效订单 获取所有订单订单编号ordersList.stream().filter((order)->order.getIsValid()==1).map((order)->order.getOrderNo()).forEach(System.out::println);// 过滤有效订单 并分离每个订单下收货地址市区信息ordersList.stream().map(o->o.getAddress().split("-")).flatMap(Arrays::stream).forEach(System.out::println);5.3、排序 //过滤有效订单 并根据用户id 进行排序 ordersList.stream().filter((order)->order.getIsValid()==1) .sorted((o1,o2)->o1.getUserId()-o2.getUserId()).forEach(System.out::println);//或者等价写法ordersList.stream().filter((order)->order.getIsValid()==1) .sorted(Comparator.comparingInt(Order::getUserId)).forEach(System.out::println);// 定制排序规则/过滤有效订单 * 定制排序:如果订单状态相同 根据订单创建时间排序 反之根据订单状态排序/ordersList.stream().filter((order)->order.getIsValid()==1).sorted((o1,o2)->{ if(o1.getStatus().equals(o2.getStatus())){ return o1.getCreateDate().compareTo(o2.getCreateDate()); }else{ return o1.getStatus().compareTo(o2.getStatus()); }}).forEach(System.out::println);6、流的终止操作终止操作会从流的流水线生成结果。其结果是任何不是流的值,比如常见的List、 Integer,甚 至void等结果。对于流的终止操作,分为以下三类:6.1、查找与匹配 // 筛选所有有效订单 匹配用户id =20 的所有订单System.out.println(“allMatch匹配结果:"+ordersList.stream(). filter((order) -> order.getIsValid() == 1).allMatch((o) -> o.getUserId() == 20));System.out.println(“anyMatch匹配结果:"+ordersList.stream(). filter((order) -> order.getIsValid() == 1).anyMatch((o) -> o.getUserId() == 20));System.out.println(“noneMatch匹配结果:"+ordersList.stream(). filter((order) -> order.getIsValid() == 1).noneMatch((o) -> o.getUserId() == 20));// 筛选所有有效订单 返回订单总数System.out.println(“count结果:"+ordersList.stream(). filter((order) -> order.getIsValid() == 1).count());// 筛选所有有效订单 返回金额最大订单值Optional<Double> max=ordersList.stream().filter((order) -> order.getIsValid() == 1) .map(Order::getTotal).max(Double::compare);System.out.println(“订单金额最大值:"+max.get());// 筛选所有有效订单 返回金额最小订单值Optional<Double> min=ordersList.stream().filter((order) -> order.getIsValid() == 1) .map(Order::getTotal).min(Double::compare);System.out.println(“订单金额最小值:"+min.get());6.2、归约将流中元素反复结合起来,得到一个值的操作。// 归约操作 计算有效订单总金额System.out.println(“有效订单总金额:"+ordersList.stream().filter((order) -> order.getIsValid() == 1).map(Order::getTotal).reduce(Double::sum).get());6.3、Collector收集数据6.3.1、收集将流转换为其他形式,coollect 方法作为终端操作, 接收一个Collector接口的实现,用于给Stream中元素做汇总的方法。最常用的方法,把流中所有元素收集到一个 List, Set 或 Collection 中toListtoSettoCollectiontoMap// 收集操作// 筛选所有有效订单 并收集订单列表List<Order> orders= ordersList.stream().filter((order) -> order.getIsValid() == 1).collect(Collectors.toList());orders.forEach(System.out::println);// 筛选所有有效订单 并收集订单号 与 订单金额Map<String,Double> map=ordersList.stream().filter((order) -> order.getIsValid() == 1).collect(Collectors.toMap(Order::getOrderNo, Order::getTotal));// java8 下对map 进行遍历操作 如果 Map 的 Key 重复了,会报错map.forEach((k,v)->{System.out.println(“k:"+k+"✌️"+v);});6.3.2、汇总countintg():用于计算总和count():用于计算总和(推荐使用,写法更简洁)summingInt() ,summingLong(),summingDouble():用于计算总和averagingInt(),averagingLong(),averagingDouble()用于平均summarizingInt,summarizingLong,summarizingDouble 同样可以实现计算总和,平均等操作,比如summarizingInt 结果会返回IntSummaryStatistics 类型 ,然后通过get方法获取对应汇总值即可// 汇总操作//筛选所有有效订单 返回订单总数System.out.println(“count结果:"+ordersList.stream(). filter((order) -> order.getIsValid() == 1).collect(Collectors.counting()));System.out.println(“count结果:"+ordersList.stream(). filter((order) -> order.getIsValid() == 1).count());// 返回订单总金额System.out.println(“订单总金额:"+ordersList.stream(). filter((order) -> order.getIsValid() == 1).collect(Collectors.summarizingDouble(Order::getTotal)));System.out.println(“订单总金额:"+ordersList.stream(). filter((order) -> order.getIsValid() == 1).mapToDouble(Order::getTotal).sum());System.out.println(“订单总金额:"+ordersList.stream(). filter((order) -> order.getIsValid() == 1).map(Order::getTotal).reduce(Double::sum).get());// 返回 用户id=20 有效订单平均每笔消息金额System.out.println(“用户id=20 有效订单平均每笔消费金额:"+ordersList.stream(). filter((order) -> order.getIsValid() == 1). filter((order -> order.getUserId()==20)) .collect(Collectors.averagingDouble(Order::getTotal)));System.out.println(“用户id=20 有效订单平均每笔消费金额:"+ ordersList.stream(). filter((order) -> order.getIsValid() == 1). filter((order -> order.getUserId()==20)) .mapToDouble(Order::getTotal).average().getAsDouble());System.out.println(“用户id=20 有效订单平均每笔消费金额:"+ ordersList.stream(). filter((order) -> order.getIsValid() == 1). filter((order -> order.getUserId()==20)) .collect(Collectors.summarizingDouble(Order::getTotal)).getAverage());// 筛选所有有效订单 并计算订单总金额System.out.println(“订单总金额:"+ordersList.stream().filter((order) -> order.getIsValid() == 1) .collect(Collectors.summingDouble(Order::getTotal)));// 筛选所有有效订单 并计算最小订单金额System.out.println(“最小订单金额:"+ordersList.stream().filter((order) -> order.getIsValid() == 1) .map(Order::getTotal).collect(Collectors.minBy(Double::compare)));// 筛选所有有效订单 并计算最大订单金额System.out.println(“最大订单金额:"+ordersList.stream().filter((order) -> order.getIsValid() == 1) .map(Order::getTotal).collect(Collectors.maxBy(Double::compare)));6.3.3、最值maxBy,minBy 两个方法,需要一个 Comparator 接口作为参数,实现最大 最小值获取操作// 取最会// 筛选所有有效订单 并计算最小订单金额System.out.println(“最小订单金额:"+ordersList.stream().filter((order) -> order.getIsValid() == 1) .map(Order::getTotal).collect(Collectors.minBy(Double::compare)));// 筛选所有有效订单 并计算最大订单金额System.out.println(“最大订单金额:"+ordersList.stream().filter((order) -> order.getIsValid() == 1) .map(Order::getTotal).collect(Collectors.maxBy(Double::compare)));6.3.4、分组groupingBy 用于将数据分组,最终返回一个 Map 类型 groupingBy 可以接受一个第二参数实现多级分组// 分组-根据有效订单支付状态进行分组操作Map<Integer,List<Order>> g01=ordersList.stream().filter((order) -> order.getIsValid() == 1) .collect(Collectors.groupingBy(Order::getStatus));g01.forEach((status,order)->{ System.out.println(”—————-”); System.out.println(“订单状态:"+status); order.forEach(System.out::println);});// 分组-查询有效订单 根据用户id 和 支付状态进行分组Map<Integer,Map<String,List<Order>>> g02= ordersList.stream().filter((order) -> order.getIsValid() == 1) .collect(Collectors.groupingBy(Order::getUserId,Collectors.groupingBy((o)->{ if(o.getStatus()==0){ return “未支付”; }else if (o.getStatus()==1){ return “已支付”; }else if (o.getStatus()==2){ return “待发货”; }else if (o.getStatus()==3){ return “已发货”; }else if (o.getStatus()==4){ return “已接收”; } else{ return “已完成”; } })));g02.forEach((userId,m)->{ System.out.println(“用户id:"+userId+”–>有效订单如下:”); m.forEach((status,os)->{ System.out.println(“状态:"+status+”—订单列表如下:”); os.forEach(System.out::println); }); System.out.println(”———————–”);});6.3.5、partitioningBy 分区分区与分组的区别在于,分区是按照 true 和 false 来分的,因此partitioningBy 接受的参数的 lambda 也是 T -> boolean // 分区操作 筛选订单金额>1000 的有效订单Map<Boolean,List<Order>> g03= ordersList.stream().filter((order) -> order.getIsValid() == 1) .collect(Collectors.partitioningBy((o)->o.getTotal()>1000));g03.forEach((b,os)->{ System.out.println(“分区结果:"+b+”–列表结果:”); os.forEach(System.out::println);});// 拼接操作 筛选有效订单 并进行拼接String orderStr=ordersList.stream().filter((order) -> order.getIsValid() == 1).map(Order::getOrderNo) .collect(Collectors.joining(”,”));System.out.println(orderStr);乐字节-Java新特性之stream流就介绍到这里了,接下来小乐还会接着给大家讲解Java8新特性之Optional,欢迎关注,转载请说明出处和作者。 ...

April 10, 2019 · 3 min · jiezi

乐字节-Java8新特性之Stream流(上)

上一篇文章,小乐给大家介绍了《Java8新特性之方法引用》,下面接下来小乐将会给大家介绍Java8新特性之Stream,称之为流,本篇文章为上半部分。1、什么是流?Java Se中对于流的操作有输入输出IO流,而Java8中引入的Stream 属于Java API中的一个新成员,它允许你以声明性方式处理数据集合,Stream 使用一种类似 SQL 语句从数据库查询数据的直观方式来提供一种对 Java 集合运算和表达的高阶抽象。 注意这里的流操作可以看做是对集合数据的处理。简单来说,流是一种数据渠道,用于操作数据源(集合、数组)所生产的元素序列。元素序列 :就像集合一样,流也提供了一个接口,可以访问特定元素类型的一组有序值。源:流会使用一个提供数据的源,如集合、数组或输入/输出资源。 请注意,从有序集 合生成流时会保留原有的顺序。由列表生成的流,其元素顺序与列表一致数据处理操作:流的数据处理功能支持类似于数据库的操作,以及函数式编程语言中 的常用操作,如filter、 map、 reduce、 find、 match、 sort等。流操作可以顺序执 行,也可并行执行。流水线:很多流操作本身会返回一个流,这样多个操作就可以链接起来,形成一个大 的流水线。内部迭代:与使用迭代器显式迭代的集合不同,流的迭代操作是在背后进行的。2、流操作整个流操作就是一条流水线,将元素放在流水线上一个个地进行处理。需要注意的是:很多流操作本身就会返回一个流,所以多个操作可以直接连接起来, 如下图这样,操作可以进行链式调用,并且并行流还可以实现数据流并行处理操作。3、流与集合3.1、计算的时机Stream 和集合的其中一个差异在于什么时候进行计算,集合,它会包含当前数据结构中所有的值,你可以随时增删,但是集合里面的元素毫无疑问地都是已经计算好了的。 流则是按需计算,按照使用者的需要计算数据,你可以想象我们通过搜索引擎进行搜索,搜索出来的条目并不是全部呈现出来的,而且先显示最符合的前 10 条或者前 20 条,只有在点击 “下一页” 的时候,才会再输出新的 10 条。 5.3.2、外部迭代与内部迭代把集合比作一个工厂的仓库,一开始工厂比较落后,要对货物作什么修改,只能工人亲自走进仓库对货物进行处理,有时候还要将处理后的货物放到一个新的仓库里面。在这个时期,我们需要亲自去做迭代,一个个地找到需要的货物,并进行处理,这叫做外部迭代。 后来工厂发展了起来,配备了流水线作业,只要根据需求设计出相应的流水线,然后工人只要把货物放到流水线上,就可以等着接收成果了,而且流水线还可以根据要求直接把货物输送到相应的仓库。这就叫做内部迭代,流水线已经帮你把迭代给完成了,你只需要说要干什么就可以了(即设计出合理的流水线)。 Java 8 引入 Stream 很大程度是因为,流的内部迭代可以自动选择一种合适你硬件的数据表示和并行实现。 4、创建流在 Java 8 中, 集合接口有两个方法来生成流:stream() − 为集合创建串行流。parallelStream() − 为集合创建并行流。示例代码如下:public static void main(String[] args) { /** * 定义集合l1 并为集合创建串行流 */ List<String> l1 = Arrays.asList(“周星驰”, “周杰伦”, “周星星”, “周润发”); // 返回串行流 l1.stream(); // 返回并行流 l1.parallelStream();}上述操作得到的流是通过原始数据转换过来的流,除了这种流创建的基本操作外,对于流的创建还有以下几种方式。4.1、值创建流Stream.of(T…) : Stream.of(“aa”, “bb”) 生成流//值创建流 生成一个字符串流Stream<String> stream = Stream.of(“java8”, “Spring”, “SpringCloud”);stream.forEach(System.out::println);4.2、数组创建流根据参数的数组类型创建对应的流。Arrays.stream(T[ ])Arrays.stream(int[ ])Arrays.stream(double[ ])Arrays.stream(long[ ]) // 只取索引第 1 到第 2 位的: int[] a = {1, 2, 3, 4}; Arrays.stream(a, 1, 3).forEach(System.out :: println);4.3、文件生成流//每个元素是给定文件的其中一行Stream<String> stream02 = Files.lines(Paths.get(“data.txt”));4.4、函数生成流两个方法:iterate : 依次对每个新生成的值应用函数generate :接受一个函数,生成一个新的值 //生成流,首元素为 0,之后依次加 2 Stream.iterate(0, n -> n + 2) //生成流,为 0 到 1 的随机双精度数 Stream.generate(Math :: random) //生成流,元素全为 1 Stream.generate(() -> 1)上半部分就介绍到这里,下半部分将会给大家介绍流的中间操作和终止操作。请多关注!转载请注明出处和作者。 ...

April 9, 2019 · 1 min · jiezi

Java 8中处理集合的优雅姿势——Stream

在Java中,集合和数组是我们经常会用到的数据结构,需要经常对他们做增、删、改、查、聚合、统计、过滤等操作。相比之下,关系型数据库中也同样有这些操作,但是在Java 8之前,集合和数组的处理并不是很便捷。不过,这一问题在Java 8中得到了改善,Java 8 API添加了一个新的抽象称为流Stream,可以让你以一种声明的方式处理数据。本文就来介绍下如何使用Stream。特别说明一下,关于Stream的性能及原理不是本文的重点,如果大家感兴趣后面会出文章单独介绍。Stream介绍Stream 使用一种类似用 SQL 语句从数据库查询数据的直观方式来提供一种对 Java 集合运算和表达的高阶抽象。Stream API可以极大提高Java程序员的生产力,让程序员写出高效率、干净、简洁的代码。这种风格将要处理的元素集合看作一种流,流在管道中传输,并且可以在管道的节点上进行处理,比如筛选,排序,聚合等。Stream有以下特性及优点:无存储。Stream不是一种数据结构,它只是某种数据源的一个视图,数据源可以是一个数组,Java容器或I/O channel等。为函数式编程而生。对Stream的任何修改都不会修改背后的数据源,比如对Stream执行过滤操作并不会删除被过滤的元素,而是会产生一个不包含被过滤元素的新Stream。惰式执行。Stream上的操作并不会立即执行,只有等到用户真正需要结果的时候才会执行。可消费性。Stream只能被“消费”一次,一旦遍历过就会失效,就像容器的迭代器那样,想要再次遍历必须重新生成。我们举一个例子,来看一下到底Stream可以做什么事情:上面的例子中,获取一些带颜色塑料球作为数据源,首先过滤掉红色的、把它们融化成随机的三角形。再过滤器并删除小的三角形。最后计算出剩余图形的周长。如上图,对于流的处理,主要有三种关键性操作:分别是流的创建、中间操作(intermediate operation)以及最终操作(terminal operation)。Stream的创建在Java 8中,可以有多种方法来创建流。1、通过已有的集合来创建流在Java 8中,除了增加了很多Stream相关的类以外,还对集合类自身做了增强,在其中增加了stream方法,可以将一个集合类转换成流。List<String> strings = Arrays.asList(“Hollis”, “HollisChuang”, “hollis”, “Hello”, “HelloWorld”, “Hollis”);Stream<String> stream = strings.stream();以上,通过一个已有的List创建一个流。除此以外,还有一个parallelStream方法,可以为集合创建一个并行流。这种通过集合创建出一个Stream的方式也是比较常用的一种方式。2、通过Stream创建流可以使用Stream类提供的方法,直接返回一个由指定元素组成的流。Stream<String> stream = Stream.of(“Hollis”, “HollisChuang”, “hollis”, “Hello”, “HelloWorld”, “Hollis”);如以上代码,直接通过of方法,创建并返回一个Stream。Stream中间操作Stream有很多中间操作,多个中间操作可以连接起来形成一个流水线,每一个中间操作就像流水线上的一个工人,每人工人都可以对流进行加工,加工后得到的结果还是一个流。以下是常用的中间操作列表:filterfilter 方法用于通过设置的条件过滤出元素。以下代码片段使用 filter 方法过滤掉空字符串:List<String> strings = Arrays.asList(“Hollis”, “”, “HollisChuang”, “H”, “hollis”);strings.stream().filter(string -> !string.isEmpty()).forEach(System.out::println);//Hollis, , HollisChuang, H, hollismapmap 方法用于映射每个元素到对应的结果,以下代码片段使用 map 输出了元素对应的平方数:List<Integer> numbers = Arrays.asList(3, 2, 2, 3, 7, 3, 5);numbers.stream().map( i -> i*i).forEach(System.out::println);//9,4,4,9,49,9,25limit/skiplimit 返回 Stream 的前面 n 个元素;skip 则是扔掉前 n 个元素。以下代码片段使用 limit 方法保理4个元素:List<Integer> numbers = Arrays.asList(3, 2, 2, 3, 7, 3, 5);numbers.stream().limit(4).forEach(System.out::println);//3,2,2,3sortedsorted 方法用于对流进行排序。以下代码片段使用 sorted 方法进行排序:List<Integer> numbers = Arrays.asList(3, 2, 2, 3, 7, 3, 5);numbers.stream().sorted().forEach(System.out::println);//2,2,3,3,3,5,7distinctdistinct主要用来去重,以下代码片段使用 distinct 对元素进行去重:List<Integer> numbers = Arrays.asList(3, 2, 2, 3, 7, 3, 5);numbers.stream().distinct().forEach(System.out::println);//3,2,7,5接下来我们通过一个例子和一张图,来演示下,当一个Stream先后通过filter、map、sort、limit以及distinct处理后会发生什么。代码如下:List<String> strings = Arrays.asList(“Hollis”, “HollisChuang”, “hollis”, “Hello”, “HelloWorld”, “Hollis”);Stream s = strings.stream().filter(string -> string.length()<= 6).map(String::length).sorted().limit(3) .distinct();过程及每一步得到的结果如下图:Stream最终操作Stream的中间操作得到的结果还是一个Stream,那么如何把一个Stream转换成我们需要的类型呢?比如计算出流中元素的个数、将流装换成集合等。这就需要最终操作(terminal operation)最终操作会消耗流,产生一个最终结果。也就是说,在最终操作之后,不能再次使用流,也不能在使用任何中间操作,否则将抛出异常:java.lang.IllegalStateException: stream has already been operated upon or closed俗话说,“你永远不会两次踏入同一条河”也正是这个意思。常用的最终操作如下图:forEachStream 提供了方法 ‘forEach’ 来迭代流中的每个数据。以下代码片段使用 forEach 输出了10个随机数:Random random = new Random();random.ints().limit(10).forEach(System.out::println);countcount用来统计流中的元素个数。List<String> strings = Arrays.asList(“Hollis”, “HollisChuang”, “hollis”,“Hollis666”, “Hello”, “HelloWorld”, “Hollis”);System.out.println(strings.stream().count());//7collectcollect就是一个归约操作,可以接受各种做法作为参数,将流中的元素累积成一个汇总结果:List<String> strings = Arrays.asList(“Hollis”, “HollisChuang”, “hollis”,“Hollis666”, “Hello”, “HelloWorld”, “Hollis”);strings = strings.stream().filter(string -> string.startsWith(“Hollis”)).collect(Collectors.toList());System.out.println(strings);//Hollis, HollisChuang, Hollis666, Hollis接下来,我们还是使用一张图,来演示下,前文的例子中,当一个Stream先后通过filter、map、sort、limit以及distinct处理后会,在分别使用不同的最终操作可以得到怎样的结果:下图,展示了文中介绍的所有操作的位置、输入、输出以及使用一个案例展示了其结果。 总结本文介绍了Java 8中的Stream 的用途,优点等。还接受了Stream的几种用法,分别是Stream创建、中间操作和最终操作。Stream的创建有两种方式,分别是通过集合类的stream方法、通过Stream的of方法。Stream的中间操作可以用来处理Stream,中间操作的输入和输出都是Stream,中间操作可以是过滤、转换、排序等。Stream的最终操作可以将Stream转成其他形式,如计算出流中元素的个数、将流装换成集合、以及元素的遍历等。本文作者:hollischuang阅读原文本文为云栖社区原创内容,未经允许不得转载。 ...

March 14, 2019 · 1 min · jiezi

Lambda表达式与Stream流 (终)

package com.java.design.java8;import org.junit.Before;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.time.;import java.time.format.DateTimeFormatter;import java.time.temporal.ChronoField;import java.util.;import java.util.concurrent.CompletableFuture;import java.util.stream.Collectors;import java.util.stream.IntStream;/** * @author 陈杨 * / @SpringBootTest@RunWith(SpringRunner.class)public class LambdaInfo {一、Lambda表达式与Stream流/ A lambda expression can be understood as a concise representation of an anonymous function that can be passed around: it doesn’t have a name, but it has a list of parameters, a body, a return type, and also possibly a list of exceptions that can be thrown. That’s one big definition; let’s break it down: Anonymous: We say anonymous because it doesn’t have an explicit name like a method would normally have: less to write and think about! Function: We say function because a lambda isn’t associated with a particular class like a method is. But like a method, a lambda has a list of parameters, a body, a return type, and a possible list of exceptions that can be thrown. Passed around: A lambda expression can be passed as argument to a method or stored in a variable. Concise: You don’t need to write a lot of boilerplate like you do for anonymous classes.// Stream : A sequence of elements from a source that supports data processing operations. Sequence of elements Source Pipelining Internal iteration Traversable only once Collections: external interation using an interator behind the scenes*/二、初始化测试数据private List<Integer> list;@Beforepublic void init() { list = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList()); list.sort(Collections.reverseOrder());}三、各种API1.allMatch@Testpublic void testLambdaInfo() { System.out.println(">———————Match方法———————-<"); // 一、Match方法 // Returns whether all elements of this stream match the provided predicate. Optional.of(list.stream().mapToInt(Integer::intValue).allMatch(i -> i > 0)) .ifPresent(System.out::println); // Returns whether any elements of this stream match the provided predicate. Optional.of(list.stream().mapToInt(Integer::intValue).anyMatch(i -> i > 0)) .ifPresent(System.out::println); // Returns whether no elements of this stream match the provided predicate.. Optional.of(list.stream().mapToInt(Integer::intValue).noneMatch(i -> i > 0)) .ifPresent(System.out::println);2、findSystem.out.println(">——————–Find方法———————–<");// 二、Find方法// Returns an Optional describing the first element of this stream,// or an empty Optional if the stream is empty.// If the stream has no encounter order, then any element may be returned.list.stream().mapToInt(Integer::intValue).filter(i -> i > 10).findFirst() .ifPresent(System.out::println);// Returns an Optional describing some element of the stream, or an empty Optional if the stream is empty.list.stream().mapToInt(Integer::intValue).filter(i -> i > 10).findAny() .ifPresent(System.out::println);3、reduceSystem.out.println(">———————Reduce方法———————-<");// 三、Reduce方法// Performs a reduction on the elements of this stream, using the provided identity value// and an associative accumulation function, and returns the reduced value.// 求和System.out.println(list.stream().reduce(0, Integer::sum));list.stream().mapToInt(Integer::intValue).reduce(Integer::sum) .ifPresent(System.out::println);// 求最大值System.out.println(list.stream().reduce(0, Integer::max));list.stream().mapToInt(Integer::intValue).reduce(Integer::max) .ifPresent(System.out::println);// 求最小值System.out.println(list.stream().reduce(0, Integer::min));list.stream().mapToInt(Integer::intValue).reduce(Integer::min) .ifPresent(System.out::println); System.out.println(">——————————————-<"); }4、CompletableFuture API@Testpublic void testCompletableFuture() { // 四、CompletableFuture API /* * Returns a new CompletableFuture that is asynchronously completed by a task * running in the given executor with the value obtained by calling the given Supplier. */ CompletableFuture.supplyAsync(list.stream().mapToInt(Integer::intValue)::sum, System.out::println); Optional.of(CompletableFuture.supplyAsync(list.stream().mapToInt(Integer::intValue)::sum) .complete(55)).ifPresent(System.out::println); // thenAccept 无返回值 Consumer<? super T> action CompletableFuture.supplyAsync(list.stream().mapToInt(Integer::intValue)::sum) .thenAccept(System.out::println); // thenApply 有返回值 Function<? super T,? extends U> fn CompletableFuture.supplyAsync(() -> list.stream().mapToInt(Integer::intValue)) .thenApply(IntStream::sum).thenAccept(System.out::println); // 对元素及异常进行处理 BiFunction<? super T, Throwable, ? extends U> fn CompletableFuture.supplyAsync(() -> list.stream().mapToInt(Integer::intValue)) .handle((i, throwable) -> “handle:\t” + i.sum()).thenAccept(System.out::println); // whenCompleteAsync 完成时执行 BiConsumer<? super T, ? super Throwable> action CompletableFuture.supplyAsync(list.stream().mapToInt(Integer::intValue)::sum) .whenCompleteAsync((value, throwable) -> System.out.println(“whenCompleteAsync:\t” + value)); // 组合CompletableFuture 将前一个结果作为后一个输入参数 (参照 组合设计模式) CompletableFuture.supplyAsync(() -> list.stream().mapToInt(Integer::intValue)) .thenCompose(i -> CompletableFuture.supplyAsync(i::sum)).thenAccept(System.out::println); // 合并CompletableFuture CompletableFuture.supplyAsync(list.stream().mapToInt(Integer::intValue)::sum) .thenCombine(CompletableFuture.supplyAsync(() -> list.stream() .mapToDouble(Double::valueOf).sum()), Double::sum).thenAccept(System.out::println); // 合并CompletableFuture CompletableFuture.supplyAsync(list.stream().mapToInt(Integer::intValue)::sum) .thenAcceptBoth(CompletableFuture.supplyAsync(list.stream() .mapToDouble(Double::valueOf)::sum), (r1, r2) -> System.out.println(“thenAcceptBoth:\t” + r1 + “\t” + r2)); // 2个CompletableFuture运行完毕后运行Runnable CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + “\tis running”); return list.stream().mapToInt(Integer::intValue).sum(); }) .runAfterBoth( CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + “\tis running”); return list.stream().mapToDouble(Double::valueOf).sum(); }), () -> System.out.println(“The 2 method have done”)); // 2个CompletableFuture 有一个运行完就执行Runnable CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + “\tis running”); return list.stream().mapToInt(Integer::intValue).sum(); }) .runAfterEither( CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + “\tis running”); return list.stream().mapToDouble(Double::valueOf).sum(); }), () -> System.out.println(“The 2 method have done”)); // 2个CompletableFuture 有一个运行完就执行Function<? super T, U> fn CompletableFuture.supplyAsync( list.stream().mapToInt(Integer::intValue).max()::getAsInt) .applyToEither( CompletableFuture.supplyAsync(list.stream().mapToInt(Integer::intValue).min()::getAsInt) , v -> v * 10) .thenAccept(System.out::println); // 2个CompletableFuture 有一个运行完就执行Consumer<? super T> action CompletableFuture.supplyAsync( list.stream().mapToInt(Integer::intValue).max()::getAsInt) .acceptEither( CompletableFuture.supplyAsync(list.stream().mapToInt(Integer::intValue).min()::getAsInt) , System.out::println); // 将集合中每一个元素都映射成为CompletableFuture<Integer>对象 List<CompletableFuture<Integer>> collect = list.stream().map(i -> CompletableFuture.supplyAsync(i::intValue)) .collect(ArrayList::new, ArrayList::add, ArrayList::addAll); // 集合转数组 CompletableFuture[] completableFutures = collect.toArray(CompletableFuture[]::new); // 有一个task执行完毕 CompletableFuture.anyOf(completableFutures) .thenRun(() -> System.out.println(“有一个task执行完毕—>first done”)); // 有且仅有所有task执行完毕 CompletableFuture.allOf(completableFutures) .thenRun(() -> System.out.println(“有且仅有所有task执行完毕—>done”));}5、Java.time API @Test public void testLocalDateTime() { // 五、Java.time API LocalDate localDate = LocalDate.of(2019, 12, 1); // 当前时间 Optional.of(LocalDate.now()).ifPresent(System.out::println); // 年份 Optional.of(localDate.getYear()).ifPresent(System.out::println); OptionalInt.of(localDate.get(ChronoField.YEAR)).ifPresent(System.out::println); // 月份 (Jan–>Dec) Optional.of(localDate.getMonth()).ifPresent(System.out::println); // 月份(1–>12) Optional.of(localDate.getMonthValue()).ifPresent(System.out::println); OptionalInt.of(localDate.get(ChronoField.MONTH_OF_YEAR)).ifPresent(System.out::println); // 年中的第几天 Optional.of(localDate.getDayOfYear()).ifPresent(System.out::println); OptionalInt.of(localDate.get(ChronoField.DAY_OF_YEAR)).ifPresent(System.out::println); // 月中的第几天 Optional.of(localDate.getDayOfMonth()).ifPresent(System.out::println); OptionalInt.of(localDate.get(ChronoField.DAY_OF_MONTH)).ifPresent(System.out::println); // 星期几(Mon–>Sun) Optional.of(localDate.getDayOfWeek()).ifPresent(System.out::println); // 星期几(1–>7) OptionalInt.of(localDate.get(ChronoField.DAY_OF_WEEK)).ifPresent(System.out::println); // 时代(公元前、后) CE BCE Optional.of(localDate.getEra()).ifPresent(System.out::println); // 时代(公元前、后) 1—>CE 0—>BCE Optional.of(localDate.getEra().getValue()).ifPresent(System.out::println); OptionalInt.of(localDate.get(ChronoField.ERA)).ifPresent(System.out::println); // ISO年表 Optional.of(localDate.getChronology().getId()).ifPresent(System.out::println); // 当前时间 LocalTime time = LocalTime.now(); // 时 OptionalInt.of(time.getHour()).ifPresent(System.out::println); OptionalInt.of(time.get(ChronoField.HOUR_OF_DAY)).ifPresent(System.out::println); // 分 OptionalInt.of(time.getMinute()).ifPresent(System.out::println); OptionalInt.of(time.get(ChronoField.MINUTE_OF_DAY)).ifPresent(System.out::println); // 秒 OptionalInt.of(time.getSecond()).ifPresent(System.out::println); OptionalInt.of(time.get(ChronoField.SECOND_OF_DAY)).ifPresent(System.out::println); // 纳秒 OptionalInt.of(time.getNano()).ifPresent(System.out::println); OptionalLong.of(time.getLong(ChronoField.NANO_OF_SECOND)).ifPresent(System.out::println); // 中午时间 Optional.of(LocalTime.NOON).ifPresent(System.out::println); // 午夜时间 Optional.of(LocalTime.MIDNIGHT).ifPresent(System.out::println); // 自定义格式化时间 DateTimeFormatter customDateTimeFormatter = DateTimeFormatter.ofPattern(“yyyy-MM-dd HH:mm:ss E”); LocalDateTime localDateTime = LocalDateTime.of(localDate, time); Optional.of(localDateTime.format(customDateTimeFormatter)).ifPresent(System.out::println); // 根据传入的文本匹配自定义指定格式进行解析 Optional.of(LocalDateTime.parse(“2019-12-25 12:30:00 周三”, customDateTimeFormatter)) .ifPresent(System.out::println); // 时间点 Instant Instant start = Instant.now(); try { Thread.sleep(10_000); } catch (InterruptedException e) { e.printStackTrace(); } Instant end = Instant.now(); // Duration 时间段 Duration duration = Duration.between(start, end); OptionalLong.of(duration.toNanos()).ifPresent(System.out::println); // Period 时间段 Period period = Period.between(LocalDate.now(), localDate); OptionalInt.of(period.getYears()).ifPresent(System.out::println); OptionalInt.of(period.getMonths()).ifPresent(System.out::println); OptionalInt.of(period.getDays()).ifPresent(System.out::println); // The Difference Between Duration And Period // Durations and periods differ in their treatment of daylight savings time when added to ZonedDateTime. // A Duration will add an exact number of seconds, thus a duration of one day is always exactly 24 hours. // By contrast, a Period will add a conceptual day, trying to maintain the local time. }} ...

March 11, 2019 · 4 min · jiezi

java8 内置函数(api)总结

常用的函数接口记录方便以后翻吧接口参数返回类型说明Predicate<T>Tboolean输入某个值,输出boolean 值,用于对某值进行判定Consumer<T>Tvoid输入某值,无输出。用于消费某值Function<T,R>TR输入某类型值,输出另种类型值,用于类型转化等Supplier<T>NoneT无输入,输出某值,用于生产某值UnaryOperator<T>TT输入某类型值,输出同类型值,用于值的同类型转化,如对值进行四则运算等BinaryOperator<T>(T,T)T输入两个某类型值,输出一个同类型值,用于两值合并等PredicatesPredicates是包含一个参数的布尔值接口。其包括一些缺省方法,组合他们使用可以实现复杂的业务逻辑(如:and, or, negate)。示例代码如下:Predicate<String> predicate = (s) -> s.length() > 0; predicate.test(“foo”); // truepredicate.negate().test(“foo”); // false Predicate<Boolean> nonNull = Objects::nonNull;Predicate<Boolean> isNull = Objects::isNull; Predicate<String> isEmpty = String::isEmpty;Predicate<String> isNotEmpty = isEmpty.negate();FunctionsFunctions接口接收一个参数并产生一个结果。其缺省方法通常被用来链接多个功能一起使用 (compose, andThen)。Function<String, Integer> toInteger = Integer::valueOf;Function<String, String> backToString = toInteger.andThen(String::valueOf); backToString.apply(“123”); // “123"SuppliersSuppliers接口生成一个给定类型结果。和Functions不同,其没有接收参数。Supplier<Person> personSupplier = Person::new;personSupplier.get(); // new PersonConsumersConsumers表现执行带有单个输入参数的操作。Consumer<Person> greeter = (p) -> System.out.println(“Hello, " + p.firstName);greeter.accept(new Person(“Luke”, “Skywalker”));ComparatorsComparators是从java旧版本升级并增加了一些缺省方法。Comparator<Person> comparator = (p1, p2) -> p1.firstName.compareTo(p2.firstName); Person p1 = new Person(“John”, “Doe”);Person p2 = new Person(“Alice”, “Wonderland”); comparator.compare(p1, p2); // > 0comparator.reversed().compare(p1, p2); // < 0Stream 常用方法创建Stream将现有数据结构转化成StreamStream<Integer> s = Stream.of(1, 2, 3, 4, 5);Stream<Integer> s = Arrays.stream(arr);Stream<Integer> s = aList.stream();通过Stream.generate()方法:// 这种方法通常表示无限序列Stream<T> s = Stream.generate(SuppLier<T> s);// 创建全体自然数的Streamclass NatualSupplier implements Supplier<BigInteger> { BigInteger next = BigInteger.ZERO; @Override public BigInteger get() { next = next.add(BigInteger.ONE); return next; }}通过其他方法返回Stream<String> lines = Files.lines(Path.get(filename))…map方法把一种操作运算映射到Stream的每一个元素上,从而完成一个Stream到另一个Stream的转换map方法接受的对象是Function接口,这个接口是一个函数式接口:<R> Stream<R> map(Function<? super T, ? extends R> mapper);@FunctionalInterfacepublic interface Function<T, R> { // 将T转换为R R apply(T t);}使用:// 获取Stream里每个数的平方的集合Stream<Integer> ns = Stream.of(1, 2, 3, 4, 5);ns.map(n -> n * n).forEach(System.out::println);flatMapmap方法是一个一对一的映射,每输入一个数据也只会输出一个值。 flatMap方法是一对多的映射,对每一个元素映射出来的仍旧是一个Stream,然后会将这个子Stream的元素映射到父集合中:Stream<List<Integer>> inputStream = Stream.of(Arrays.asList(1), Arrays.asList(2, 3), Arrays.asList(4, 5, 6));List<Integer> integerList = inputStream.flatMap((childList) -> childList.stream()).collect(Collectors.toList());//将一个“二维数组”flat为“一维数组”integerList.forEach(System.out::println);filter方法filter方法用于过滤Stream中的元素,并用符合条件的元素生成一个新的Stream。filter方法接受的参数是Predicate接口对象,这个接口是一个函数式接口:Stream<T> filter(Predicate<? super T>) predicate;@FunctionInterfacepublic interface Predicate<T> { // 判断元素是否符合条件 boolean test(T t);}使用// 获取当前Stream所有偶数的序列Stream<Integer> ns = Stream.of(1, 2, 3, 4, 5);ns.filter(n -> n % 2 == 0).forEach(System.out::println);limit、skiplimit用于限制获取多少个结果,与数据库中的limit作用类似,skip用于排除前多少个结果。sortedsorted函数需要传入一个实现Comparator函数式接口的对象,该接口的抽象方法compare接收两个参数并返回一个整型值,作用就是排序,与其他常见排序方法一致。distinctdistinct用于剔除重复,与数据库中的distinct用法一致。findFirstfindFirst方法总是返回第一个元素,如果没有则返回空,它的返回值类型是Optional<T>类型,接触过swift的同学应该知道,这是一个可选类型,如果有第一个元素则Optional类型中保存的有值,如果没有第一个元素则该类型为空。Stream<User> stream = users.stream();Optional<String> userID = stream.filter(User::isVip).sorted((t1, t2) -> t2.getBalance() - t1.getBalance()).limit(3).map(User::getUserID).findFirst();userID.ifPresent(uid -> System.out.println(“Exists”));min、maxmin可以对整型流求最小值,返回OptionalInt。 max可以对整型流求最大值,返回OptionalInt。 这两个方法是结束操作,只能调用一次。allMatch、anyMatch、noneMatchallMatch:Stream中全部元素符合传入的predicate返回 trueanyMatch:Stream中只要有一个元素符合传入的predicate返回 truenoneMatch:Stream中没有一个元素符合传入的predicate返回 truereduce方法reduce方法将一个Stream的每一个元素一次作用于BiFunction,并将结果合并。reduce方法接受的方法是BinaryOperator接口对象。Optional<T> reduce(BinaryOperator<T> accumulator);@FuncationalInterfacepublic interface BinaryOperator<T> extends BiFunction<T, T, T> { // Bi操作,两个输入,一个输出 T apply(T t, T u);}使用:// 求当前Stream累乘的结果Stream<Integer> ns = Stream.of(1, 2, 3, 4, 5);int r = ns.reduce( (x, y) -> x * y ).get();System.out.println(r); ...

March 6, 2019 · 2 min · jiezi

详解NodeJs流之一

如果你对NodeJs系列感兴趣,欢迎关注微信公众号:前端神盾局或 github NodeJs系列文章流从早先的unix初出茅庐,在过去的几十年的时间里,它被证明是一种可依赖的编程方式,它可以将一个大型的系统拆成一些很小的部分,并且让这些部分之间完美地进行合作。在node中,流的身影几乎无处不在,无论是操作文件、创建本地服务器还是简单的console,都极有可能涉及到流。Node.js 中有四种基本的流类型:Readable - 可读取数据的流(例如 fs.createReadStream())。Writable - 可写入数据的流(例如 fs.createWriteStream())。Duplex - 可读又可写的流(例如 net.Socket)。Transform - 在读写过程中可以修改或转换数据的 Duplex 流(例如 zlib.createDeflate())为什么使用流假设我们需要使用node来实现一个简单的静态文件服务器:const http = require(‘http’);const fs = require(‘fs’);http.createServer((req,res)=>{ fs.readFile(’./test.html’,function(err,data){ if(err){ res.statusCode = 500; res.end(); }else{ res.end(data); } })}).listen(3000)上述代码简单实现了静态文件的读取和发送,逻辑上是完全可行的。但是由于readFile是一次性将读取的文件存放在内存中的,假设test.html文件非常大或者访问量增多的情况下,服务器内存很有可能耗尽。这时我们就需要使用流的方式进行改进:const http = require(‘http’);const fs = require(‘fs’);http.createServer((req,res)=>{ fs.createReadStream(’./test.html’).pipe(res);}).listen(3000);fs.createReadStream创建一个可读流,逐次读取文件内容供给下游消费,这种逐步读取和消费的方式,有效减缓了内存的消耗。可读流(Readable Stream)我们可以把 Readable Stream拆分成两个阶段:push阶段和pull阶段,在push阶段,通过实现_read方法将数据从底层数据资源池中推送到缓存池中,这是数据的生产阶段,而pull阶段,则是将缓存池的数据拉出,供下游使用,这是数据的消费阶段。在开始进一步讲解之前,我们先来介绍几个字段,这些字段来源于node源码:state.buffer: Array 缓存池,每个元素对应push(data)中的datastate.length: Number 缓存池中的数据量,在objectMode模式下,state.length === state.buffer.length,否则,其值是state.buffer中数据字节数的总和state.ended: Boolean 表示底层数据池没有可读数据了(this.pull(null))state.flowing: Null|Boolean 表示当前流的模式,其值有三种情况:null(初始状态)、true(流动模式)、false(暂停模式)state.needReadable: Boolean 是否需要触发readable事件state.reading: Boolean 是否正在读取底层数据state.sync: Boolean 是否立即触发data/readable事件,false为立即触发、true下一个tick再触发(process.nextTick)两种模式可读流存在两种模式:流动模式(flowing)和暂停模式(paused),在源码中使用state.flowing来标识。两种模式其基本流程都遵循上图中的push和pull阶段,区别在于pull阶段的自主性。对于流动模式而言,只要缓存池还有未消耗的数据,那么数据便会不断的被提取,我们可以把它想象成一个自动的水泵,只要通电了,不抽干水池的水它是不会停下来的。而对于暂停模式,它更像是打水桶,需要的时候再从水池里面打点水出来。所有可读流都开始于暂停模式,可以通过以下方式切换到流动模式:添加data事件句柄(前提是state.flowing === null)调用stream.resume()调用stream.pipe()可读流也可以通过以下方式切换回暂停模式:添加readable事件句柄如果没有管道目标,则调用 stream.pause()。如果有管道目标,则移除所有管道目标。调用 stream.unpipe() 可以移除多个管道目标。一切从read开始对于可读流而言,消费驱动生产,只有通过调用pull阶段的read函数,才能唤醒push阶段的数据产生,从而带动整个流的运动。所以对于可读流而言read是一切的起点。这是根据源码整理的一个简单的流程图,后面将对一些环节加以说明。howMuchToRead调用read(n)过程中,node会根据实际情况调整读取的数量,实际值由howMuchRead决定function howMuchToRead(n,state){ // 如果size <= 0或者不存在可读数据 if (n <= 0 || (state.length === 0 && state.ended)) return 0; // objectMode模式下 每次制度一个单位长度的数据 if (state.objectMode) return 1; // 如果size没有指定 if (Number.isNaN(n)) { // 执行read()时,由于流动模式下数据会不断输出, // 所以每次只输出缓存中第一个元素输出,而非流动模式则会将缓存读空 if (state.flowing && state.length) return state.buffer.head.data.length; else return state.length; } if (n > state.highWaterMark) // 更新highWaterMark state.highWaterMark = computeNewHighWaterMark(n); // 如果缓存中的数据量够用 if (n <= state.length) return n; // 如果缓存中的数据不够用, // 且资源池还有可读取的数据,那么这一次先不读取缓存数据 // 留着下一次数据量足够的时候再读取 // 否则读空缓存池 if (!state.ended) { state.needReadable = true; return 0; } return state.length;}end事件在read函数调用过程中,node会择机判定是否触发end事件,判定标准主要是以下两个条件:if (state.length === 0 && state.ended) endReadable(this);底层数据(资源)没有可读数据,此时state.ended为true,通过调用pull(null)表示底层数据当前已经没有可读数据了缓存池中没有可读数据 state.length === 0本事件在调用read([size])时触发(满足上述条件时)doReaddoRead用于判断是否读取底层数据 // 如果当前是暂停模式state.needReadable var doRead = state.needReadable; // 如果当前缓存池是空的或者没有足够的缓存 if (state.length === 0 || state.length - n < state.highWaterMark){ doRead = true; } if (state.ended || state.reading) { doRead = false; } else if (doRead) { // … this._read(state.highWaterMark); // … }state.reading标志上次从底层取数据的操作是否已完成,一旦push方法被调用,就会设置为false,表示此次_read()结束data事件在官方文档中提到:添加data事件句柄,可以使Readable Stream的模式切换到流动模式,但官方没有提到的是这一结果成立的条件-state.flowing的值不为null,即只有在初始状态下,监听data事件,会使流进入流动模式。举个例子:const { Readable } = require(‘stream’);class ExampleReadable extends Readable{ constructor(opt){ super(opt); this._time = 0; } _read(){ this.push(String(++this._time)); }}const exampleReadable = new ExampleReadable();// 暂停 state.flowing === falseexampleReadable.pause();exampleReadable.on(‘data’,(chunk)=>{ console.log(Received ${chunk.length} bytes of data.);});运行这个例子,我们发现终端没有任何输出,为什么会这样呢?原因我们可以从源码中看出端倪 if (state.flowing !== false) this.resume();由此我们可以把官方表述再完善一些:在可读流初始化状态下(state.flowing === null),添加data事件句柄会使流进入流动模式。push只能被可读流的实现调用,且只能在 readable._read() 方法中调用。push是数据生产的核心,消费方通过调用read(n)促使流输出数据,而流通过_read()使底层调用push方法将数据传给流。在这个过程中,push方法有可能将数据存放在缓存池内,也有可能直接通过data事件输出。下面我们一一分析。如果当前流是流动的(state.flowing === true),且缓存池内没有可读数据,那么数据将直接由事件data输出// node 源码if (state.flowing && state.length === 0 && !state.sync){ state.awaitDrain = 0; stream.emit(‘data’, chunk);} 我们举个例子:const { Readable } = require(‘stream’);class ExampleReadable extends Readable{ constructor(opt){ super(opt); this.max = 100; this.time = 0; } _read(){ const seed = setTimeout(()=>{ if(this.time > 100){ this.push(null); }else{ this.push(String(++this.time)); } clearTimeout(seed); },0) }}const exampleReadable = new ExampleReadable({ });exampleReadable.on(‘data’,(data)=>{ console.log(‘from data’,data);});readable事件exampleReadable.on(‘readable’,()=>{ ….});当我们注册一个readable事件后,node就会做以下处理:将流切换到暂停模式state.flowing = false; state.needReadable = true;如果缓存池未消耗的数据,触发readable,stream.emit(‘readable’);否则,判断当前是否正在读取底层数据,如果不是,开始(nextTick)读取底层数据self.read(0);触发条件state.flow === false当前处于暂停模式缓存池中还有数据或者本轮底层数据已经读取完毕state.length || state.endedreturn !state.ended && (state.length < state.highWaterMark || state.length === 0);参考Node.js v10.15.1 文档深入理解 Node.js Stream 内部机制stream-handbook如何形象的描述反应式编程中的背压(Backpressure)机制?数据流中的积压问题Node.js Stream - 进阶篇Node Stream ...

March 1, 2019 · 2 min · jiezi

Stream流与Lambda表达式(五) Stream BaseStream AutoCloseable源码解读

package com.java.design.java8.Stream.StreamDetail;import org.junit.Before;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.util.Arrays;import java.util.List;import java.util.UUID;import java.util.stream.Stream;/** * @author 陈杨 /@SpringBootTest@RunWith(SpringRunner.class)public class StreamDetail { private List<Integer> integerList; @Before public void init() { integerList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 0); } @Test public void testStreamDetail() {一、流的定义 // Stream : A sequence of elements supporting sequential and parallel aggregate operations. // 流: 支持 串行、并行 聚合操作 元素序列二、流的创建 // 流的创建: // Collection.stream() Collection.parallelStream() // Stream.generate System.out.println("—————————————–\n"); System.out.println(“以Stream.generate方法生成流”); Stream<String> generate = Stream.generate(UUID.randomUUID()::toString); generate.findFirst().ifPresent(System.out::println); System.out.println("—————————————–\n");三、对象引用流 // 对象引用流 // Stream , which is a stream of object references四、流的计算 // 流的计算: stream pipeline // To perform a computation, stream operations are composed into a stream pipeline.五、流管道组成 // 流管道组成: 源数据source–>数组、集合、生成器函数、IO通道等 // [0,n) 中间操作(intermediate operations) 一个流转换 为 另一个新流 // 终止操作(terminal operation)产生一个结果 或有副作用(修改流中元素 属性或状态) // A stream pipeline consists of a source (which might be an array, // a collection, a generator function, an I/O channel,etc), // zero or more <em>intermediate operations</em> (which transform a // stream into another stream, such as {@link Stream#filter(Predicate)}), and a // <em>terminal operation</em> (which produces a result or side-effect, such // as {@link Stream#count()} or {@link Stream#forEach(Consumer)}).六、流的消费 // 流的消费: 流中对于源数据的计算 有且仅有在终止操作触发时才会被调用 // 流中元素只有在被需要时才会被消费 // lazy(惰性): 如果没有终止操作 那么一系列的中间操作都不会被执行 // stream流操作只会执行一次: stream中有一个容器 将所有中间操作打包 放入容器中 // 调用终止操作时 触发容器的链式中间操作 将流中每一个元素 应用于中间业务逻辑 // Streams are lazy; computation on the source data is only performed when the // terminal operation is initiated, and source elements are consumed only as needed. // 流创建后 只能被消费一次 否则抛异常 // 除非流被设计成为显示并发修改的流如ConcurrentHashMap 否则未期望或错误的行为就会在执行时产生 // Unless the source was explicitly designed for concurrent modification // (such as a ConcurrentHashMap),unpredictable or erroneous behavior may result // from modifying the stream source while it is being queried. // java.lang.IllegalStateException: stream has already been operated upon or closed七、 Lambda表达式的正确行为 // Lambda表达式的正确行为: // To preserve correct behavior,these <em>behavioral parameters</em>: // must be <a href=“package-summary.html#NonInterference”>non-interfering</a> // (they do not modify the stream source); // in most cases must be <a href=“package-summary.html#Statelessness”>stateless</a> // (their result should not depend on any state that might change during execution // of the stream pipeline).八、流与集合// 流与集合:// 集合关注的是对元素的管理与访问// 流不会直接提供直接访问或操作其元素的方法// 流提供声明性描述: 源 与 建立于源之上的聚合计算执行操作// 如果流没有提供预期的功能 可执行受控遍历(iterator、spliterator)// Collections and streams, while bearing some superficial similarities,// have different goals. Collections are primarily concerned with the efficient// management of, and access to, their elements. By contrast, streams do not// provide a means to directly access or manipulate their elements, and are// instead concerned with declaratively describing their source and the// computational operations which will be performed in aggregate on that source.// However, if the provided stream operations do not offer the desired// functionality, the {@link #iterator()} and {@link #spliterator()} operations// can be used to perform a controlled traversal.九、 流的MapReduce操作// 流的MapReduce操作 求集合 每个元素的2倍 之和// 此例中:Integer 每执行一次reduce操作 触发 该元素的map操作一次System.out.println(integerList.stream().map(i -> 2 * i).reduce(0, Integer::sum));System.out.println("—————————————–\n");十、流资源自动关闭 AutoCloseable接口实现package com.java.design.java8.Stream.StreamDetail;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;/* * @author 陈杨 /@SpringBootTest@RunWith(SpringRunner.class)public class AutoCloseableTest implements AutoCloseable { / * An object that may hold resources (such as file or socket handles) * until it is closed. The {@link #close()} method of an {@code AutoCloseable} * object is called automatically when exiting a {@code * try}-with-resources block for which the object has been declared in * the resource specification header. This construction ensures prompt * release, avoiding resource exhaustion exceptions and errors that * may otherwise occur. * * 实现AutoCloseable接口: * 使用try–with–resources 代码块 替代 try–catch–finally * 在代码块运行完毕后 自动实现 资源的关闭 * public interface AutoCloseable { * void close() throws Exception; * } / public void doSomeThing() { System.out.println(“method doSomeThing invoked!”); } @Override public void close() throws Exception { System.out.println(“method close invoked!”); } @Test public void testAutoCloseable() throws Exception { try (AutoCloseableTest autoCloseableTest = new AutoCloseableTest()) { autoCloseableTest.doSomeThing(); } }}十一、认识BaseStream 与 closeHandlerpackage com.java.design.java8.Stream.StreamDetail.BaseStreamDetail;import org.junit.Before;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.util.Arrays;import java.util.List;import java.util.stream.Stream;/* * @author 陈杨 */@SpringBootTest@RunWith(SpringRunner.class)public class BaseStreamDetail { private List<String> list; private NullPointerException myException; @Before public void init() { list = Arrays.asList(“Kirito”, “Asuna”, “Illyasviel”, “Sakura”); myException = new NullPointerException(“my NullPointerException”); } @Test public void testBaseStreamDetail() { // public interface Stream<T> extends BaseStream<T, Stream<T>> /public interface BaseStream<T, S extends BaseStream<T, S>> extends AutoCloseable { Iterator<T> iterator(); * Returns a spliterator for the elements of this stream. Spliterator<T> spliterator(); * 当终止操作应用于流时 判断其是否并行 * 当执行终止操作之后 执行isParallel() 会得到不可预期的结果 * 此方法需适用于终止操作之前 * * Returns whether this stream, if a terminal operation were to be executed, * would execute in parallel. Calling this method after invoking an * terminal stream operation method may yield unpredictable results. * boolean isParallel(); * 返回一个等价的串行流 * 流本身已经是串行 或 流的状态已被修改为串行 * * Returns an equivalent stream that is sequential. May return * itself, either because the stream was already sequential, or because * the underlying stream state was modified to be sequential. * S sequential(); * 返回一个等价的并行流 * 流本身已经是并行 或 流的状态已被修改为并行 * * Returns an equivalent stream that is parallel. May return * itself, either because the stream was already parallel, or because * the underlying stream state was modified to be parallel. * S parallel(); * 返回一个等价的无序流 * 流本身已经是无序 或 流的状态已被修改为无序 * * Returns an equivalent stream that is * <a href=“package-summary.html#Ordering”>unordered</a>. May return * itself, either because the stream was already unordered, or because * the underlying stream state was modified to be unordered. * S unordered(); * 返回一个等价的流 有close handler * close handler当流close()方法调用时触发 * 调用顺序:close handlers被添加先后顺序 * * 所有的close handlers都会被调用 即使出现了异常 * 如果任意close handler抛出异常 * 那么第一个异常会传递给调用段 * 其他异常(剩余或被抑制)会传递给调用段 * 除非其中有与第一个异常相同的异常(相同对象) 因为相同异常不能抑制自身 * * Returns an equivalent stream with an additional close handler. Close * handlers are run when the {@link #close()} method * is called on the stream, and are executed in the order they were * added. All close handlers are run, even if earlier close handlers throw * exceptions. If any close handler throws an exception, the first * exception thrown will be relayed to the caller of {@code close()}, with * any remaining exceptions added to that exception as suppressed exceptions * (unless one of the remaining exceptions is the same exception as the * first exception, since an exception cannot suppress itself.) May * return itself. * * @param closeHandler A task to execute when the stream is closed * @return a stream with a handler that is run if the stream is closed S onClose(Runnable closeHandler); * 关闭流 调用流管道中的close handlers * * Closes this stream, causing all close handlers for this stream pipeline to be called. @Override void close(); }/ try (Stream<String> stream = list.stream()) { stream.onClose(() -> { System.out.println(“close handler first”); // throw new NullPointerException(“null pointer exception 1”); throw myException; }).onClose(() -> { System.out.println(“close handler second”); // throw new NullPointerException(“null pointer exception 2”); throw myException; }).onClose(() -> { System.out.println(“close handler third”); // throw new NullPointerException(“null pointer exception 3”); throw myException; }).forEach(System.out::println); } }}十二、测试结果testStreamDetail测试 . ____ _ __ _ _ /\ / ’ __ _ () __ __ _ \ \ \ ( ( )__ | ‘_ | ‘| | ‘ / | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.1.2.RELEASE)2019-02-20 17:40:31.060 INFO 15872 --- [ main] c.j.d.j.S.StreamDetail.StreamDetail : Starting StreamDetail on DESKTOP-87RMBG4 with PID 15872 (started by 46250 in E:\IdeaProjects\design)2019-02-20 17:40:31.062 INFO 15872 --- [ main] c.j.d.j.S.StreamDetail.StreamDetail : No active profile set, falling back to default profiles: default2019-02-20 17:40:31.584 INFO 15872 --- [ main] c.j.d.j.S.StreamDetail.StreamDetail : Started StreamDetail in 0.728 seconds (JVM running for 1.461)-----------------------------------------以Stream.generate方法生成流2742c0e9-7bdb-4c7e-88c9-b5d94684215c-----------------------------------------90-----------------------------------------AutoCloseableTest测试 . ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \( ( )\___ | '_ | '_| | '_ \/ _ | \ \ \ \ \/ )| |)| | | | | || (| | ) ) ) ) ’ || .__|| ||| |_, | / / / / =========||==============|/=//// :: Spring Boot :: (v2.1.2.RELEASE)2019-02-20 17:39:45.456 INFO 16320 — [ main] c.j.d.j.S.S.AutoCloseableTest : Starting AutoCloseableTest on DESKTOP-87RMBG4 with PID 16320 (started by 46250 in E:\IdeaProjects\design)2019-02-20 17:39:45.457 INFO 16320 — [ main] c.j.d.j.S.S.AutoCloseableTest : No active profile set, falling back to default profiles: default2019-02-20 17:39:45.956 INFO 16320 — [ main] c.j.d.j.S.S.AutoCloseableTest : Started AutoCloseableTest in 0.716 seconds (JVM running for 1.433)method doSomeThing invoked!method close invoked!testBaseStreamDetail测试 . ____ _ __ _ _ /\ / ’ __ _ () __ __ _ \ \ \ ( ( )__ | ‘_ | ‘| | ‘ / ` | \ \ \ \ \/ )| |)| | | | | || (| | ) ) ) ) ’ || .__|| ||| |_, | / / / / =========||==============|/=//// :: Spring Boot :: (v2.1.2.RELEASE)2019-02-20 17:46:41.886 INFO 15216 — [ main] c.j.d.j.S.S.B.BaseStreamDetail : Starting BaseStreamDetail on DESKTOP-87RMBG4 with PID 15216 (started by 46250 in E:\IdeaProjects\design)2019-02-20 17:46:41.887 INFO 15216 — [ main] c.j.d.j.S.S.B.BaseStreamDetail : No active profile set, falling back to default profiles: default2019-02-20 17:46:42.435 INFO 15216 — [ main] c.j.d.j.S.S.B.BaseStreamDetail : Started BaseStreamDetail in 0.762 seconds (JVM running for 1.48)KiritoAsunaIllyasvielSakuraclose handler firstclose handler secondclose handler thirdjava.lang.NullPointerException: my NullPointerExceptionProcess finished with exit code -1 ...

February 20, 2019 · 7 min · jiezi

Stream流与Lambda表达式(六) SpliteratorDetail

package com.java.design.java8.Stream.StreamDetail.BaseStreamDetail;import org.junit.Before;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.util.Arrays;import java.util.List;import java.util.function.Consumer;import java.util.function.IntConsumer;/** * @author 陈杨 /@SpringBootTest@RunWith(SpringRunner.class)public class SpliteratorDetail { private IntConsumer intConsumer; private Consumer consumer; private List<String> list; @Before public void init() { intConsumer = System.out::println; consumer = System.out::println; list = Arrays.asList(“Kirito”, “Asuna”, “Sinon”, “Yuuki”, “Alice”); } private void action(IntConsumer intConsumer) { intConsumer.accept(100); } @Test public void testSpliteratorDetail() {一、流的创建–源(集合)// 一、流的创建–源(集合)/Collection集合默认方法 list.stream()defaultStream<E> stream () { return StreamSupport.stream(spliterator(), false);}@OverridedefaultSpliterator<E> spliterator () { return Spliterators.spliterator(this, 0);}public static <T> Spliterator<T> spliterator(Collection<? extends T> c,int characteristics) { return new IteratorSpliterator<>(Objects.requireNonNull(c), characteristics);}/// Collector 接口 与 Collectors 静态类实现// Spliterator 接口 与 Spliterators 静态类实现二、Spliterator 接口// 二、Spliterator 接口// Spliterator 接口// 对数据源中元素进行遍历或分区// An object for traversing and partitioning elements of a source.// 延迟绑定数据源// 绑定时机:首次遍历、切分、查询大小 而不是在创建时// A <em>late-binding</em> Spliterator binds to the source of elements at the// point of first traversal, first split, or first query for estimated size,// rather than at the time the Spliterator is created.// 非延迟绑定数据源// 绑定时机:Spliterator创建时 或Spliterator的方法首次调用// A Spliterator that is not <em>late-binding</em> binds to the source of elements// at the point of construction or first invocation of any method.// Spliterator 与 Iterator 的区别://// Spliterator 优势:通过分解和单元素迭代 支持串行与并行// 比Iterator迭代通过hasNext与next性能更好// Spliterators, like {@code Iterator}s, are for traversing the elements of a source.// The Spliterator API was designed to support efficient parallel traversal// in addition to sequential traversal, by supporting decomposition as well as single-element iteration.// In addition, the protocol for accessing elements via a Spliterator is designed to impose// smaller per-element overhead than {@code Iterator}, and to avoid the inherent// race involved in having separate methods for {@code hasNext()} and {@code next()}.三、Spliterator特性值/ public interface Spliterator<T> {// 三、Spliterator特性值 * Characteristic value signifying that an encounter order is defined for * elements. If so, this Spliterator guarantees that method * {@link #trySplit} splits a strict prefix of elements, that method 分割前后对元素加严格前缀 * {@link #tryAdvance} steps by one element in prefix order, and that 按照元素的顺序前缀遍历 * {@link #forEachRemaining} performs actions in encounter order. 对剩余元素按照相遇顺序执行action * * <p>A {@link Collection} has an encounter order if the corresponding * {@link Collection#iterator} documents an order. If so, the encounter * order is the same as the documented order. Otherwise, a collection does * not have an encounter order. * 集合是有序的,则文档是有序的 * 集合是无序的,则文档是无序的 * * @apiNote Encounter order is guaranteed to be ascending index order for * any {@link List}. But no order is guaranteed for hash-based collections * such as {@link HashSet}. Clients of a Spliterator that reports * {@code ORDERED} are expected to preserve ordering constraints in * non-commutative parallel computations. * 基于索引升序的List 排序–>有序 * 基于Hash散列的HashSet 排序–>无序 * 非并发情况下期望要保留 有序集合中 元素的顺序 以返还给客户端调用者 public static final int ORDERED = 0x00000010; * Characteristic value signifying that, for each pair of * encountered elements {@code x, y}, {@code !x.equals(y)}. This * applies for example, to a Spliterator based on a {@link Set}. 基于Set的去重DISTINCT public static final int DISTINCT = 0x00000001; * Characteristic value signifying that encounter order follows a defined * sort order. If so, method {@link #getComparator()} returns the associated * Comparator, or {@code null} if all elements are {@link Comparable} and * are sorted by their natural ordering. * * <p>A Spliterator that reports {@code SORTED} must also report * {@code ORDERED}. * 已排序的一定是有序的 * * @apiNote The spliterators for {@code Collection} classes in the JDK that * implement {@link NavigableSet} or {@link SortedSet} report {@code SORTED}. * 如果基于集合的spliterator实现了NavigableSet或SortedSet接口 则为SORTED public static final int SORTED = 0x00000004; * Characteristic value signifying that the value returned from * {@code estimateSize()} prior to traversal or splitting represents a * finite size that, in the absence of structural source modification, * represents an exact count of the number of elements that would be * encountered by a complete traversal. * 源中元素个数有限 源元素结构特性未被修改 estimateSize能在完整遍历过程中 精准计算 public static final int SIZED = 0x00000040; * Characteristic value signifying that the source guarantees that * encountered elements will not be {@code null}. (This applies, * for example, to most concurrent collections, queues, and maps.) * 源中元素都不为null public static final int NONNULL = 0x00000100; * Characteristic value signifying that the element source cannot be * structurally modified; that is, elements cannot be added, replaced, or * removed, so such changes cannot occur during traversal. A Spliterator * that does not report {@code IMMUTABLE} or {@code CONCURRENT} is expected * to have a documented policy (for example throwing * {@link ConcurrentModificationException}) concerning structural * interference detected during traversal. * 源中元素结构不可变 * 源中元素在遍历过程中 不能被 添加 替换(包含修改) 删除 * 如果遍历时 发送元素结构发生改变 则不能表示为IMMUTABLE或CONCURRENT 抛出ConcurrentModificationException public static final int IMMUTABLE = 0x00000400; * Characteristic value signifying that the element source may be safely * concurrently modified (allowing additions, replacements, and/or removals) * by multiple threads without external synchronization. If so, the * Spliterator is expected to have a documented policy concerning the impact * of modifications during traversal. * * <p>A top-level Spliterator should not report both {@code CONCURRENT} and * {@code SIZED}, since the finite size, if known, may change if the source * is concurrently modified during traversal. Such a Spliterator is * inconsistent and no guarantees can be made about any computation using * that Spliterator. Sub-spliterators may report {@code SIZED} if the * sub-split size is known and additions or removals to the source are not * reflected when traversing. * * <p>A top-level Spliterator should not report both {@code CONCURRENT} and * {@code IMMUTABLE}, since they are mutually exclusive. Such a Spliterator * is inconsistent and no guarantees can be made about any computation using * that Spliterator. Sub-spliterators may report {@code IMMUTABLE} if * additions or removals to the source are not reflected when traversing. * * @apiNote Most concurrent collections maintain a consistency policy * guaranteeing accuracy with respect to elements present at the point of * Spliterator construction, but possibly not reflecting subsequent * additions or removals. * 顶层的Spliterator不能同时拥有CONCURRENT和SIZED特性 * 并发时可能存在对源进行添加、替换(修改)、删除 以改变元素个数 * 顶层的Spliterator不能同时拥有CONCURRENT和IMMUTABLE特性 * 这两种特性是互斥的 * 大多数并发集合都保持一致性策略,以确保在拆分器构造点存在的元素的准确性,但可能不反映随后的添加或删除 public static final int CONCURRENT = 0x00001000; * Characteristic value signifying that all Spliterators resulting from * {@code trySplit()} will be both {@link #SIZED} and {@link #SUBSIZED}. * (This means that all child Spliterators, whether direct or indirect, will * be {@code SIZED}.) * * <p>A Spliterator that does not report {@code SIZED} as required by * {@code SUBSIZED} is inconsistent and no guarantees can be made about any * computation using that Spliterator. * * @apiNote Some spliterators, such as the top-level spliterator for an * approximately balanced binary tree, will report {@code SIZED} but not * {@code SUBSIZED}, since it is common to know the size of the entire tree * but not the exact sizes of subtrees. * 顶层二叉树是SIZED 但不是SUBSIZED 因为不知道子树的大小 * 从trySplit返回的子Spliterator都是SIZED 和 SUBSIZED public static final int SUBSIZED = 0x00004000;四、Spliterator方法// 四、Spliterator方法 * If a remaining element exists, performs the given action on it, * returning {@code true}; else returns {@code false}. If this * Spliterator is {@link #ORDERED} the action is performed on the * next element in encounter order. Exceptions thrown by the * action are relayed to the caller. * 尝试遍历: 如果有下一个元素 就对其执行action 如果是有序的 按照元素相遇顺序 对其执行action 如果有异常 将异常信息返回给方法调用者 tryAdvance() 完成了 Iterator的hasNext()与next() boolean tryAdvance(Consumer<? super T> action); * Performs the given action for each remaining element, sequentially in * the current thread, until all elements have been processed or the action * throws an exception. If this Spliterator is {@link #ORDERED}, actions * are performed in encounter order. Exceptions thrown by the action * are relayed to the caller. 按顺序遍历剩余元素 并对每个元素执行action 直到遍历结束 将异常信息返回给方法调用者 default void forEachRemaining(Consumer<? super T> action) { do { } while (tryAdvance(action)); } * If this spliterator can be partitioned, returns a Spliterator * covering elements, that will, upon return from this method, not * be covered by this Spliterator. * * <p>If this Spliterator is {@link #ORDERED}, the returned Spliterator * must cover a strict prefix of the elements. * * <p>Unless this Spliterator covers an infinite number of elements, * repeated calls to {@code trySplit()} must eventually return {@code null}. * Upon non-null return: * <ul> * <li>the value reported for {@code estimateSize()} before splitting, * must, after splitting, be greater than or equal to {@code estimateSize()} * for this and the returned Spliterator; and</li> * <li>if this Spliterator is {@code SUBSIZED}, then {@code estimateSize()} * for this spliterator before splitting must be equal to the sum of * {@code estimateSize()} for this and the returned Spliterator after * splitting.</li> * </ul> * * <p>This method may return {@code null} for any reason, * including emptiness, inability to split after traversal has * commenced, data structure constraints, and efficiency * considerations. * * @apiNote * An ideal {@code trySplit} method efficiently (without * traversal) divides its elements exactly in half, allowing * balanced parallel computation. Many departures from this ideal * remain highly effective; for example, only approximately * splitting an approximately balanced tree, or for a tree in * which leaf nodes may contain either one or two elements, * failing to further split these nodes. However, large * deviations in balance and/or overly inefficient {@code * trySplit} mechanics typically result in poor parallel * performance. 尝试对Spliterator中元素进行trySplit 若能进行拆分,则返回一个新的Spliterator对象 装载已分割的元素 如果分割前有序,分割后也是有序的 分割结果不为null: 进行有限分割后 最终能得到非null元素 分割结果为null: 对有限元素个数的分割:进行无限分割 分割前元素个数为null 遍历开始后无法拆分 数据结构约束 性能考量 Spliterator<T> trySplit(); * Returns an estimate of the number of elements that would be * encountered by a {@link #forEachRemaining} traversal, or returns {@link * Long#MAX_VALUE} if infinite, unknown, or too expensive to compute. * * <p>If this Spliterator is {@link #SIZED} and has not yet been partially * traversed or split, or this Spliterator is {@link #SUBSIZED} and has * not yet been partially traversed, this estimate must be an accurate * count of elements that would be encountered by a complete traversal. * Otherwise, this estimate may be arbitrarily inaccurate, but must decrease * as specified across invocations of {@link #trySplit}. * * @apiNote * Even an inexact estimate is often useful and inexpensive to compute. * For example, a sub-spliterator of an approximately balanced binary tree * may return a value that estimates the number of elements to be half of * that of its parent; if the root Spliterator does not maintain an * accurate count, it could estimate size to be the power of two * corresponding to its maximum depth. 估算元素数量(即将遍历的元素个数) 如果元素数量无限 未知 或计算成本很昂贵 返回Long.Max_Value 如果Spliterator是一个SIZED或SUBSIZED estimate则是完整遍历所需要的值(accurate精确) long estimateSize(); * Convenience method that returns {@link #estimateSize()} if this * Spliterator is {@link #SIZED}, else {@code -1}. characteristic.SIZED –>返回确定的大小 否则为 -1L default long getExactSizeIfKnown() { return (characteristics() & SIZED) == 0 ? -1L : estimateSize(); } * Returns a set of characteristics of this Spliterator and its * elements. The result is represented as ORed values from {@link * #ORDERED}, {@link #DISTINCT}, {@link #SORTED}, {@link #SIZED}, * {@link #NONNULL}, {@link #IMMUTABLE}, {@link #CONCURRENT}, * {@link #SUBSIZED}. Repeated calls to {@code characteristics()} on * a given spliterator, prior to or in-between calls to {@code trySplit}, * should always return the same result. * * <p>If a Spliterator reports an inconsistent set of * characteristics (either those returned from a single invocation * or across multiple invocations), no guarantees can be made * about any computation using this Spliterator. * * @apiNote The characteristics of a given spliterator before splitting * may differ from the characteristics after splitting. For specific * examples see the characteristic values {@link #SIZED}, {@link #SUBSIZED} * and {@link #CONCURRENT}. * * @return a representation of characteristics 返回Spliterator与其元素的一个特性值标识 在分割期间或之前 其元素的特性不变 分割前后若元素的特性发生了变更 对其进行计算行为是不能受到保证的 int characteristics(); * Returns {@code true} if this Spliterator’s {@link * #characteristics} contain all of the given characteristics. 判断是否包含此元素特性 default boolean hasCharacteristics(int characteristics) { return (characteristics() & characteristics) == characteristics; } * If this Spliterator’s source is {@link #SORTED} by a {@link Comparator}, * returns that {@code Comparator}. If the source is {@code SORTED} in * {@linkplain Comparable natural order}, returns {@code null}. Otherwise, * if the source is not {@code SORTED}, throws {@link IllegalStateException}. 如果source是有序的: 如果是按照比较器进行排序 则返回该比较器 如果是Comparable natural order 则返回null 如果source是无序的 抛出IllegalStateException异常 default Comparator<? super T> getComparator() { throw new IllegalStateException(); } * A Spliterator specialized for primitive values. * 针对于原生类型值的特化分割器 * * @param <T> the type of elements returned by this Spliterator. * The type must be a wrapper type for a primitive type, * such as {@code Integer} for the primitive {@code int} type. * @param <T_CONS> the type of primitive consumer. The type must be a * primitive specialization of {@link java.util.function.Consumer} for * {@code T}, such as {@link java.util.function.IntConsumer} for {@code Integer}. * @param <T_SPLITR> the type of primitive Spliterator. The type must be * a primitive specialization of Spliterator for {@code T}, such as * {@link Spliterator.OfInt} for {@code Integer}. * * @see Spliterator.OfInt * @see Spliterator.OfLong * @see Spliterator.OfDouble * @since 1.8 * T Spliterator返回的元素类型:原生包装类型 * T_CONS primitive consumer :java.util.function.IntConsumer对Integer的原生特化 * T_SPLITR primitive Spliterator :Spliterator.OfInt对Integer的原生特化 * public interface OfPrimitive<T, T_CONS, T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>> extends Spliterator<T> { @Override T_SPLITR trySplit(); @SuppressWarnings(“overloads”) boolean tryAdvance(T_CONS action); @SuppressWarnings(“overloads”) default void forEachRemaining(T_CONS action) { do { } while (tryAdvance(action)); } } * A Spliterator specialized for {@code int} values. * @since 1.8 public interface OfInt extends OfPrimitive<Integer, IntConsumer, OfInt> { @Override OfInt trySplit(); @Override boolean tryAdvance(IntConsumer action); @Override default void forEachRemaining(IntConsumer action) { do { } while (tryAdvance(action)); }五、Consumer 与 IntConsumer、LongConsumer、DoubleConsumer 五、Consumer 与 IntConsumer、LongConsumer、DoubleConsumer // Consumer 与 IntConsumer 为什么能进行强制类型转换? // Consumer 与 IntConsumer 之间没有继承关系 层次上无关系 // Consumer 与 IntConsumer 当传入的参数是整型int,Integer时 会自动进行装箱拆箱 // ((IntConsumer) action::accept) 是Lambda表达式 // Lambda表达式 是一种匿名函数 没有方法声明 具有上下文自动推测类型功能 * {@inheritDoc} * @implSpec * If the action is an instance of {@code IntConsumer} then it is cast * to {@code IntConsumer} and passed to * {@link #tryAdvance(java.util.function.IntConsumer)}; otherwise * the action is adapted to an instance of {@code IntConsumer}, by * boxing the argument of {@code IntConsumer}, and then passed to * {@link #tryAdvance(java.util.function.IntConsumer)}. @Override default boolean tryAdvance(Consumer<? super Integer> action) { if (action instanceof IntConsumer) { return tryAdvance((IntConsumer) action); } else { if (Tripwire.ENABLED) Tripwire.trip(getClass(), “{0} calling Spliterator.OfInt.tryAdvance((IntConsumer) action::accept)”); return tryAdvance((IntConsumer) action::accept); } } * {@inheritDoc} * @implSpec * If the action is an instance of {@code IntConsumer} then it is cast * to {@code IntConsumer} and passed to * {@link #forEachRemaining(java.util.function.IntConsumer)}; otherwise * the action is adapted to an instance of {@code IntConsumer}, by * boxing the argument of {@code IntConsumer}, and then passed to * {@link #forEachRemaining(java.util.function.IntConsumer)}. @Override default void forEachRemaining(Consumer<? super Integer> action) { if (action instanceof IntConsumer) { forEachRemaining((IntConsumer) action); } else { if (Tripwire.ENABLED) Tripwire.trip(getClass(), “{0} calling Spliterator.OfInt.forEachRemaining((IntConsumer) action::accept)”); forEachRemaining((IntConsumer) action::accept); } } } * A Spliterator specialized for {@code long} values. * @since 1.8 public interface OfLong extends OfPrimitive<Long, LongConsumer, OfLong> { @Override OfLong trySplit(); @Override boolean tryAdvance(LongConsumer action); @Override default void forEachRemaining(LongConsumer action) { do { } while (tryAdvance(action)); } * {@inheritDoc} * @implSpec * If the action is an instance of {@code LongConsumer} then it is cast * to {@code LongConsumer} and passed to * {@link #tryAdvance(java.util.function.LongConsumer)}; otherwise * the action is adapted to an instance of {@code LongConsumer}, by * boxing the argument of {@code LongConsumer}, and then passed to * {@link #tryAdvance(java.util.function.LongConsumer)}. @Override default boolean tryAdvance(Consumer<? super Long> action) { if (action instanceof LongConsumer) { return tryAdvance((LongConsumer) action); } else { if (Tripwire.ENABLED) Tripwire.trip(getClass(), “{0} calling Spliterator.OfLong.tryAdvance((LongConsumer) action::accept)”); return tryAdvance((LongConsumer) action::accept); } } * {@inheritDoc} * @implSpec * If the action is an instance of {@code LongConsumer} then it is cast * to {@code LongConsumer} and passed to * {@link #forEachRemaining(java.util.function.LongConsumer)}; otherwise * the action is adapted to an instance of {@code LongConsumer}, by * boxing the argument of {@code LongConsumer}, and then passed to * {@link #forEachRemaining(java.util.function.LongConsumer)}. @Override default void forEachRemaining(Consumer<? super Long> action) { if (action instanceof LongConsumer) { forEachRemaining((LongConsumer) action); } else { if (Tripwire.ENABLED) Tripwire.trip(getClass(), “{0} calling Spliterator.OfLong.forEachRemaining((LongConsumer) action::accept)”); forEachRemaining((LongConsumer) action::accept); } } } * A Spliterator specialized for {@code double} values. * @since 1.8 public interface OfDouble extends OfPrimitive<Double, DoubleConsumer, OfDouble> { @Override OfDouble trySplit(); @Override boolean tryAdvance(DoubleConsumer action); @Override default void forEachRemaining(DoubleConsumer action) { do { } while (tryAdvance(action)); } * {@inheritDoc} * @implSpec * If the action is an instance of {@code DoubleConsumer} then it is * cast to {@code DoubleConsumer} and passed to * {@link #tryAdvance(java.util.function.DoubleConsumer)}; otherwise * the action is adapted to an instance of {@code DoubleConsumer}, by * boxing the argument of {@code DoubleConsumer}, and then passed to * {@link #tryAdvance(java.util.function.DoubleConsumer)}. @Override default boolean tryAdvance(Consumer<? super Double> action) { if (action instanceof DoubleConsumer) { return tryAdvance((DoubleConsumer) action); } else { if (Tripwire.ENABLED) Tripwire.trip(getClass(), “{0} calling Spliterator.OfDouble.tryAdvance((DoubleConsumer) action::accept)”); return tryAdvance((DoubleConsumer) action::accept); } } * {@inheritDoc} * @implSpec * If the action is an instance of {@code DoubleConsumer} then it is * cast to {@code DoubleConsumer} and passed to * {@link #forEachRemaining(java.util.function.DoubleConsumer)}; * otherwise the action is adapted to an instance of * {@code DoubleConsumer}, by boxing the argument of * {@code DoubleConsumer}, and then passed to * {@link #forEachRemaining(java.util.function.DoubleConsumer)}. @Override default void forEachRemaining(Consumer<? super Double> action) { if (action instanceof DoubleConsumer) { forEachRemaining((DoubleConsumer) action); } else { if (Tripwire.ENABLED) Tripwire.trip(getClass(), “{0} calling Spliterator.OfDouble.forEachRemaining((DoubleConsumer) action::accept)”); forEachRemaining((DoubleConsumer) action::accept); } } }}/六、Consumer 与 IntConsumer 的强制类型转换测试// 六、Consumer 与 IntConsumer 的强制类型转换测试// 传入面向对象 对象this.action(intConsumer);// 传入Lambda表达式 函数式编程this.action(intConsumer::accept);this.action(value -> intConsumer.accept(value));this.action(consumer::accept);this.action(value -> consumer.accept(value));// 面向对象强制类型转换 报错java.lang.ClassCastException// this.action((IntConsumer) consumer);// this.action(((IntConsumer) consumer)::accept);// this.action(t -> ((IntConsumer) consumer).accept(t));// 函数式编程强制类型转换 Lambda表达式没变this.action((IntConsumer) consumer::accept);this.action((IntConsumer) (t -> consumer.accept(t)));this.action((IntConsumer) t -> consumer.accept(t));七、Iterator-based Spliterators 与 StreamSupport底层实现// 七、Iterator-based Spliterators 与 StreamSupport底层实现// Iterator-based Spliterators/ * A Spliterator using a given Iterator for element * operations. The spliterator implements {@code trySplit} to * permit limited parallelism. * spliterator利用trySplit实现有限的并行化操作 * * static class IteratorSpliterator<T> implements Spliterator<T> {} // * Low-level utility methods for creating and manipulating streams. * 用于创建和操作流的底层实用程序方法 * * <p>This class is mostly for library writers presenting stream views * of data structures; most static stream methods intended for end users are in * the various {@code Stream} classes. * StreamSupport提供数据结构的流视图的library 大多数为终端用户使用的静态流方法在Stream类中 * * @since 1.8 * public final class StreamSupport { * Creates a new sequential or parallel {@code Stream} from a * {@code Spliterator}. * * * <p>The spliterator is only traversed, split, or queried for estimated * size after the terminal operation of the stream pipeline commences. * 仅在流管道的终端操作开始后,才遍历、拆分或查询spliterator的估计大小。 * * <p>It is strongly recommended the spliterator report a characteristic of * {@code IMMUTABLE} or {@code CONCURRENT}, or be * <a href="../Spliterator.html#binding">late-binding</a>. Otherwise, * {@link #stream(java.util.function.Supplier, int, boolean)} should be used * to reduce the scope of potential interference with the source. See * <a href=“package-summary.html#NonInterference”>Non-Interference</a> for * more details. * 强烈建议对spliterator设置characteristic(IMMUTABLE CONCURRENT late-binding) * 以减少潜在的干扰源范围 * public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) { Objects.requireNonNull(spliterator); return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); }}/八、流源分析 // 八、流源分析 / 流源的创建 Abstract base class for an intermediate pipeline stage or pipeline source stage implementing whose elements are of type {@code U}. 抽象基类:用于实现其元素类型为{@code U}的中间管道阶段或管道源阶段 ReferencePipeline 操作引用类型 (将源阶段 与 [0,n)个中间操作阶段 看做一个对象) * @param <P_IN> type of elements in the upstream source * @param <P_OUT> type of elements in produced by this stage abstract class ReferencePipeline<P_IN, P_OUT> extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>> implements Stream<P_OUT> { * 源阶段 * Source stage of a ReferencePipeline. * * @param <E_IN> type of elements in the upstream source * @param <E_OUT> type of elements in produced by this stage ReferencePipeline中静态内部类Head static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> * * 注意: * 流本身不持有数据 * 数据的持有者:流的数据源(集合、数组等) * 流关注对数据的计算 // 抽象基类: 抽象管道AbstractPipeline 流接口及 其特化的核心实现 管理流管道 创建 与 评估 * Abstract base class for “pipeline” classes, which are the core * implementations of the Stream interface and its primitive specializations. * Manages construction and evaluation of stream pipelines. * * <p>A concrete intermediate stage is generally built from an * {@code AbstractPipeline}, a shape-specific pipeline class which extends it * (e.g., {@code IntPipeline}) which is also abstract, and an operation-specific * concrete class which extends that. {@code AbstractPipeline} contains most of * the mechanics of evaluating the pipeline, and implements methods that will be * used by the operation; the shape-specific classes add helper methods for * dealing with collection of results into the appropriate shape-specific * containers. * * * <p>After chaining a new intermediate operation, or executing a terminal * operation, the stream is considered to be consumed, and no more intermediate * or terminal operations are permitted on this stream instance. * 在链式添加中间操作或一个终止操作后 流视做被消费 * 流只能被消费一次 已消费–>不允许在此流实例中存在更多的中间操作或终止操作 * * @implNote * <p>For sequential streams, and parallel streams without * <a href=“package-summary.html#StreamOps”>stateful intermediate * operations</a>, parallel streams, pipeline evaluation is done in a single * pass that “jams” all the operations together. For parallel streams with * stateful operations, execution is divided into segments, where each * stateful operations marks the end of a segment, and each segment is * evaluated separately and the result used as the input to the next * segment. In all cases, the source data is not consumed until a terminal * operation begins. * 串行流 与 无状态的并行流 * 流的消费 是将中间的操作进行“jams”(打包放一起)对流中每个元素执行action–>single pass * * 有状态的并行流 * 执行分成segments 分别对segment执行有状态操作 并将其结果作为下一个segment输入 * * 在任何情况下,有且只有在一个终止操作被调用时 流真正被消费 abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>> extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> AbstractPipeline的构造方法 AbstractPipeline(Supplier<? extends Spliterator<?>> source, int sourceFlags, boolean parallel) {} AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) {} 同一时间构造同一个AbstractPipeline 有且只有调用AbstractPipeline构造方法之一 sourceSpliterator与sourceSupplier 同一时间只能存在其一 当流被消费后 若not null 要设置为null 只能被消费一次 private Spliterator<?> sourceSpliterator; private Supplier<? extends Spliterator<?>> sourceSupplier; // 针对于流源的foreach Optimized sequential terminal operations for the head of the pipeline @Override public void forEach(Consumer<? super E_OUT> action) { if (!isParallel()) { sourceStageSpliterator().forEachRemaining(action); } else { super.forEach(action); } } Terminal operations from Stream @Override public void forEach(Consumer<? super P_OUT> action) { evaluate(ForEachOps.makeRef(action, false)); } /九、Array.asList()流源遍历注意事项 // 九、Array.asList()流源遍历注意事项 / 为什么 未调用IteratorSpliterator.forEachRemaining() list.stream().forEach(System.out::println); 执行过程分析 Arrays.asList() private static class ArrayList<E> extends AbstractList<E> implements RandomAccess, java.io.Serializable{ private final E[] a; ArrayList(E[] array) { a = Objects.requireNonNull(array); } @Override public Spliterator<E> spliterator() { return Spliterators.spliterator(a, Spliterator.ORDERED); } } public static <T> Spliterator<T> spliterator(Object[] array, int additionalCharacteristics) { return new ArraySpliterator<>(Objects.requireNonNull(array), additionalCharacteristics); } @Override public void forEach(Consumer<? super E_OUT> action) { if (!isParallel()) { sourceStageSpliterator().forEachRemaining(action); } else { super.forEach(action); } } @SuppressWarnings(“unchecked”) @Override public void forEachRemaining(Consumer<? super T> action) { Object[] a; int i, hi; // hoist accesses and checks from loop if (action == null) throw new NullPointerException(); if ((a = array).length >= (hi = fence) && (i = index) >= 0 && i < (index = hi)) { do { action.accept((T)a[i]); } while (++i < hi); } }*/ System.out.println(list.getClass()); // Arrays中静态内部类ArrayList (class java.util.Arrays$ArrayList) // @Override public Spliterator<E> spliterator(){} // 调用ArraySpliterator.forEachRemaining()实现 list.stream().forEach(System.out::println); // 普通集合遍历 Iterable 中的 forEach // 效率高 list.forEach(System.out::println); }}十、测试结果 . ____ _ __ _ _ /\ / ’ __ _ () __ __ _ \ \ \ ( ( )__ | ‘_ | ‘| | ‘ / ` | \ \ \ \ \/ )| |)| | | | | || (| | ) ) ) ) ’ || .__|| ||| |_, | / / / / =========||==============|/=//// :: Spring Boot :: (v2.1.2.RELEASE)2019-02-20 18:09:13.662 INFO 2224 — [ main] c.j.d.j.S.S.B.SpliteratorDetail : Starting SpliteratorDetail on DESKTOP-87RMBG4 with PID 2224 (started by 46250 in E:\IdeaProjects\design)2019-02-20 18:09:13.663 INFO 2224 — [ main] c.j.d.j.S.S.B.SpliteratorDetail : No active profile set, falling back to default profiles: default2019-02-20 18:09:14.133 INFO 2224 — [ main] c.j.d.j.S.S.B.SpliteratorDetail : Started SpliteratorDetail in 0.653 seconds (JVM running for 1.335)100100100100100100100100class java.util.Arrays$ArrayListKiritoAsunaSinonYuukiAliceKiritoAsunaSinonYuukiAlice ...

February 20, 2019 · 18 min · jiezi

Stream流与Lambda表达式(三) 静态工厂类Collectors

/** * @author 陈杨 /@SpringBootTest@RunWith(SpringRunner.class)public class CollectorsDetail { private List<String> names; private List<Student> students; private List<List<String>> snames; @Before public void init() { names = Arrays.asList(“Kirito”, “Asuna”, “Sinon”, “Yuuki”, “Alice”); snames = Arrays.asList(Collections.singletonList(“Kirito”), Collections.singletonList(“Asuna”), Collections.singletonList(“Sinon”), Collections.singletonList(“Yuuki”), Collections.singletonList(“Alice”)); students = new Students().init(); } @Test public void testCollectorsDetail() {一、静态工厂类Collectors 实现方式// 一、静态工厂类Collectors 实现方式// Collectors 静态工厂类 最终由CollectorImpl实现// 1、 通过CollectorImpl实现// 2、 通过reducing()实现—> reducing()底层由CollectorImpl实现// Collectors.toList() 是 Collectors.toCollection()的一种具化表现形式// Collectors.joining() 使用StringBuilder.append 拼接字符串二、静态工厂类Collectors 常用收集器// 二、静态工厂类Collectors 常用收集器/ 返回一个(不可修改)unmodifiable List的ArrayList 按照相遇的顺序encounter order@since 10@SuppressWarnings(“unchecked”)public static <T> Collector<T, ?, List<T>> toUnmodifiableList() { return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add, (left, right) -> { left.addAll(right); return left; }, list -> (List<T>)List.of(list.toArray()), CH_NOID);}// 返回一个(不可修改)unmodifiable Set的HashSet 无序@since 10@SuppressWarnings(“unchecked”)public static <T> Collector<T, ?, Set<T>> toUnmodifiableSet() { return new CollectorImpl<>((Supplier<Set<T>>) HashSet::new, Set::add, (left, right) -> { if (left.size() < right.size()) { right.addAll(left); return right; } else { left.addAll(right); return left; } }, set -> (Set<T>)Set.of(set.toArray()), CH_UNORDERED_NOID);}// 返回一个flatMapping扁平化mapper处理后 由downstream收集的 累加器处理的结果合并 单线程@since 9public static <T, U, A, R> Collector<T, ?, R> flatMapping(Function<? super T, ? extends Stream<? extends U>> mapper, Collector<? super U, A, R> downstream) { BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator(); return new CollectorImpl<>(downstream.supplier(), (r, t) -> { try (Stream<? extends U> result = mapper.apply(t)) { if (result != null) result.sequential().forEach(u -> downstreamAccumulator.accept(r, u)); } }, downstream.combiner(), downstream.finisher(), downstream.characteristics());}// 对符合Predicate预期结果 进行过滤filtering 使用downstream 收集元素@since 9public static <T, A, R> Collector<T, ?, R> filtering(Predicate<? super T> predicate, Collector<? super T, A, R> downstream) { BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator(); return new CollectorImpl<>(downstream.supplier(), (r, t) -> { if (predicate.test(t)) { downstreamAccumulator.accept(r, t); } }, downstream.combiner(), downstream.finisher(), downstream.characteristics());}// 使用Map映射key–>keyMapper–>Key value–>valueMapper–>Value 对映射后的结果进行组装entrySet–>Map<Key,Value> (unmodifiable Map)@since 10@SuppressWarnings({“rawtypes”, “unchecked”})public static <T, K, U> Collector<T, ?, Map<K,U>> toUnmodifiableMap(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends U> valueMapper) { Objects.requireNonNull(keyMapper, “keyMapper”); Objects.requireNonNull(valueMapper, “valueMapper”); return collectingAndThen( toMap(keyMapper, valueMapper), map -> (Map<K,U>)Map.ofEntries(map.entrySet().toArray(new Map.Entry[0])));}// 使用Map映射key–>keyMapper–>Key value–>valueMapper–>Value mergeFunction 对相同key 映射后的进行Value 合并操作 对映射后的结果进行组装entrySet–>Map<Key,Value> (unmodifiable Map)@since 10@SuppressWarnings({“rawtypes”, “unchecked”})public static <T, K, U> Collector<T, ?, Map<K,U>> toUnmodifiableMap(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends U> valueMapper, BinaryOperator<U> mergeFunction) { Objects.requireNonNull(keyMapper, “keyMapper”); Objects.requireNonNull(valueMapper, “valueMapper”); Objects.requireNonNull(mergeFunction, “mergeFunction”); return collectingAndThen( toMap(keyMapper, valueMapper, mergeFunction, HashMap::new), map -> (Map<K,U>)Map.ofEntries(map.entrySet().toArray(new Map.Entry[0])));}/// Collectors 收集器System.out.println("—————————————–\n");// 使用ArrayList集合 按照流中元素排列先后顺序 进行添加操作List<String> unmodifiableList = names.stream().collect(Collectors.toUnmodifiableList());System.out.println(unmodifiableList);System.out.println("—————————————\n");// 使用HashSet集合 对流中元素顺序 进行添加操作Set<String> unmodifiableSet = names.stream().collect(Collectors.toUnmodifiableSet());System.out.println(unmodifiableSet);System.out.println("—————————————\n");// 将集合扁平化展开 对其中的字符串的每个字母 进行大写转换 使用ArrayList对转换后的结果进行收集List<String> strings = snames.stream() .collect(Collectors.flatMapping(list -> list.stream().map(String::toUpperCase), Collectors.toList()));System.out.println(strings);System.out.println("—————————————\n");// 对流中元素进行遍历 对符合预期要求的元素 使用ArrayList集合存放List<String> filteringPredicate = names.stream().collect(Collectors.filtering(“Kirito”::equals, Collectors.toList()));System.out.println(filteringPredicate);System.out.println("—————————————\n");// 对流中元素进行遍历 对key–>keyMapper value–>valueMapper 映射 得到Map集合Map<Integer, List<Student>> listMap = students.stream() .collect(Collectors.toUnmodifiableMap(Student::getId, student -> { List<Student> stus = new ArrayList<>(); stus.add(student); return stus; }));System.out.println(listMap);System.out.println("—————————————\n");// 对流中元素进行遍历 对key–>keyMapper–>Key value–>valueMapper–>Value 映射// 对满足相同Key的元素 Value进行合并Map<Integer, String> lengthName = names.stream() .collect(Collectors.toUnmodifiableMap(String::length, String::toString, (str1, str2) -> str1 + “\t” + str2));System.out.println(lengthName);System.out.println("—————————————\n");三、groupingBy分组// 三、groupingBy分组// 分组 groupingBy// 对T元素按 key进行分组 –> 分组的依据 classifier–> 分组后的集合 value–> 得到 Map<key,value>/Returns a {@code Collector} implementing a “group by” operation oninput elements of type {@code T}, grouping elements according to a classification function, and returning the results in a {@code Map}.<p>The classification function maps elements to some key type {@code K}.The collector produces a {@code Map<K, List<T>>} whose keys are thevalues resulting from applying the classification function to the inputelements, and whose corresponding values are {@code List}s containing the input elements which map to the associated key under the classificationfunction.<p>There are no guarantees on the type, mutability, serializability, orthread-safety of the {@code Map} or {@code List} objects returned.// 按照key对T进行classifier分组 使用List集合收集分组结果 单线程 线程安全public static <T, K> Collector<T, ?, Map<K, List<T>>>groupingBy(Function<? super T, ? extends K> classifier) { return groupingBy(classifier, toList());}// 按照key对T进行classifier分组 使用自定义收集器downstream 收集分组结果 单线程 线程安全public static <T, K, A, D> Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream) { return groupingBy(classifier, HashMap::new, downstream);}// 按照key对T进行classifier分组 使用自定义mapFactory 重置downstream的CollectorImpl实现 使用自定义收集器downstream 收集分组结果 单线程 线程安全public static <T, K, D, A, M extends Map<K, D>>Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier, Supplier<M> mapFactory, Collector<? super T, A, D> downstream) { Supplier<A> downstreamSupplier = downstream.supplier(); BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator(); BiConsumer<Map<K, A>, T> accumulator = (m, t) -> { K key = Objects.requireNonNull(classifier.apply(t), “element cannot be mapped to a null key”); A container = m.computeIfAbsent(key, k -> downstreamSupplier.get()); downstreamAccumulator.accept(container, t); }; BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner()); @SuppressWarnings(“unchecked”) Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory; // 不进行强制类型转换 重构CollectorImpl实现 if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) { return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID); } else { // 进行强制类型转换 重构CollectorImpl实现 @SuppressWarnings(“unchecked”) Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher(); Function<Map<K, A>, M> finisher = intermediate -> { intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v)); @SuppressWarnings(“unchecked”) M castResult = (M) intermediate; return castResult; }; return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID); }}/// 分组System.out.println("—————————————–\n");// select * from students group by sex ;Map<String, List<Student>> sexStudent = students.stream().collect(Collectors.groupingBy(Student::getSex));System.out.println(sexStudent);System.out.println("—————————————–\n");// select sex, count() from students group by sex ;Map<String, Long> sexCount = students.stream().collect(Collectors.groupingBy(Student::getSex, Collectors.counting()));System.out.println(sexCount);System.out.println("—————————————–\n");// select sex,avg(salary) from students group by sex ;Map<String, Double> avgSexSalary = students.stream().collect (Collectors.groupingBy(Student::getSex, Collectors.averagingDouble(Student::getSalary)));System.out.println(avgSexSalary);System.out.println("—————————————–\n");// 嵌套分组 先根据sex分组 再对结果按照addr进行分组Map<String, Map<String, List<Student>>> NestedGroupBy = students.stream() .collect(Collectors.groupingBy(Student::getSex, Collectors.groupingBy(Student::getAddr)));System.out.println(NestedGroupBy);System.out.println("—————————————–\n");// 使用自定义收集器downstream 按性别分组 使用HashSet进行 结果收集Map<String, HashSet<Student>> sexHashSet = students.stream() .collect(Collectors.groupingBy(Student::getSex, Collectors.toCollection(HashSet::new)));System.out.println(sexHashSet);System.out.println("—————————————–\n");// 使用自定义收集器downstream 按性别分组 使用HashSet进行 结果收集 重置CollectorImpl实现Map<String, HashSet<Student>> sexCustomCollectorImpl = students.stream() .collect(Collectors.groupingBy(Student::getSex, Hashtable::new, Collectors.toCollection(HashSet::new)));System.out.println(sexCustomCollectorImpl);System.out.println("—————————————–\n");四、groupingByConcurrent分组 // 四、groupingByConcurrent分组 // 分组 groupingByConcurrent // 流的适用条件: // 无序 Collector.Characteristics.UNORDERED // 并发 Collector.Characteristics.CONCURRENT // This is a {@link Collector.Characteristics#CONCURRENT concurrent} and // {@link Collector.Characteristics#UNORDERED unordered} Collector. /* 按照key对T进行classifier分组 使用List集合收集分组结果 public static <T, K> Collector<T, ?, ConcurrentMap<K, List<T>>> groupingByConcurrent(Function<? super T, ? extends K> classifier) { return groupingByConcurrent(classifier, ::new, toList()); }// 按照key对T进行classifier分组 使用自定义收集器downstream 收集分组结果public static <T, K, A, D> Collector<T, ?, ConcurrentMap<K, D>> groupingByConcurrent(Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream) { return groupingByConcurrent(classifier, ConcurrentHashMap::new, downstream); }/ / 按照key对T进行classifier分组 使用自定义累加器mapFactory 重置downstream的CollectorImpl实现 使用自定义收集器downstream 收集分组结果 public static <T, K, A, D, M extends ConcurrentMap<K, D>> Collector<T, ?, M> groupingByConcurrent(Function<? super T, ? extends K> classifier, Supplier<M> mapFactory, Collector<? super T, A, D> downstream) { Supplier<A> downstreamSupplier = downstream.supplier(); BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator(); BinaryOperator<ConcurrentMap<K, A>> merger = Collectors.<K, A, ConcurrentMap<K, A>>mapMerger(downstream.combiner()); @SuppressWarnings(“unchecked”) Supplier<ConcurrentMap<K, A>> mangledFactory = (Supplier<ConcurrentMap<K, A>>) mapFactory; BiConsumer<ConcurrentMap<K, A>, T> accumulator; if (downstream.characteristics().contains(Collector.Characteristics.CONCURRENT)) { accumulator = (m, t) -> { K key = Objects.requireNonNull(classifier.apply(t), “element cannot be mapped to a null key”); A resultContainer = m.computeIfAbsent(key, k -> downstreamSupplier.get()); downstreamAccumulator.accept(resultContainer, t); }; } else { accumulator = (m, t) -> { K key = Objects.requireNonNull(classifier.apply(t), “element cannot be mapped to a null key”); A resultContainer = m.computeIfAbsent(key, k -> downstreamSupplier.get()); // 多个操作–>同时操作同一个结果容器–>同一时间,有且只有一个进行实际操作–>线程同步 synchronized (resultContainer) { downstreamAccumulator.accept(resultContainer, t); } }; } if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) { return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_CONCURRENT_ID); } else { @SuppressWarnings(“unchecked”) Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher(); Function<ConcurrentMap<K, A>, M> finisher = intermediate -> { intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v)); @SuppressWarnings(“unchecked”) M castResult = (M) intermediate; return castResult; }; return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_CONCURRENT_NOID); } }/// 分组 无序 并发System.out.println("—————————————–\n");// 按性别分组 使用ArrayList进行 结果收集ConcurrentMap<String, List<Student>> sexStudentConcurrent = students.stream() .collect(Collectors.groupingByConcurrent(Student::getSex));System.out.println(sexStudentConcurrent);System.out.println("—————————————–\n");// 使用自定义收集器downstream 按性别分组 使用HashSet进行 结果收集ConcurrentMap<String, HashSet<Student>> sexHashSetConcurrent = students.stream() .collect(Collectors.groupingByConcurrent(Student::getSex, Collectors.toCollection(HashSet::new)));System.out.println(sexHashSetConcurrent);System.out.println("—————————————–\n");// 使用自定义收集器downstream 按性别分组 使用HashSet进行 结果收集 重置CollectorImpl实现ConcurrentReferenceHashMap<String, HashSet<Student>> sexCustomCollectorImplConcurrent = students.stream().collect(Collectors.groupingByConcurrent(Student::getSex, ConcurrentReferenceHashMap::new, Collectors.toCollection(HashSet::new)));System.out.println(sexCustomCollectorImplConcurrent);System.out.println("—————————————–\n");五、partitioningBy分区// 五、partitioningBy分区// 分区 partitioningBy/ 对满足预期的条件 进行分区 使用ArrayList 进行结果的收集public static <T> Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> predicate) { return partitioningBy(predicate, toList());}// 对满足预期的条件 进行分区 使用自定义收集器downstream 进行结果的收集public static <T, D, A> Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate, Collector<? super T, A, D> downstream) { BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator(); BiConsumer<Partition<A>, T> accumulator = (result, t) -> downstreamAccumulator.accept(predicate.test(t) ? result.forTrue : result.forFalse, t); BinaryOperator<A> op = downstream.combiner(); BinaryOperator<Partition<A>> merger = (left, right) -> new Partition<>(op.apply(left.forTrue, right.forTrue), op.apply(left.forFalse, right.forFalse)); Supplier<Partition<A>> supplier = () -> new Partition<>(downstream.supplier().get(), downstream.supplier().get()); if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) { return new CollectorImpl<>(supplier, accumulator, merger, CH_ID); } else { Function<Partition<A>, Map<Boolean, D>> finisher = par -> new Partition<>(downstream.finisher().apply(par.forTrue), downstream.finisher().apply(par.forFalse)); return new CollectorImpl<>(supplier, accumulator, merger, finisher, CH_NOID); }}*/ // 分区 System.out.println("—————————————–\n"); // select * from students partition by (addr == Sword Art Online) ; Map<Boolean, List<Student>> addrPartition = students.stream().collect (Collectors.partitioningBy(student -> student.getAddr().equals(“Sword Art Online”))); System.out.println(addrPartition); System.out.println("—————————————–\n"); // 嵌套分区 先根据sex分区 再对结果按照addr进行分区 Map<Boolean, Map<Boolean, List<Student>>> NestedPartiton = students.stream() .collect(Collectors.partitioningBy(student -> student.getSex().equals(“Male”), Collectors.partitioningBy(student -> student.getAddr().equals(“Sword Art Online”)))); System.out.println(NestedPartiton); System.out.println("—————————————–\n"); // 使用自定义downstream收集器分区 Map<Boolean, HashSet<Student>> addrCustomPartition = students.stream().collect(Collectors.partitioningBy (student -> student.getAddr().equals(“Sword Art Online”), Collectors.toCollection(HashSet::new))); System.out.println(addrCustomPartition); System.out.println("—————————————–\n"); }}六、测试结果 . ____ _ __ _ _ /\ / ’ __ _ () __ __ _ \ \ \ ( ( )__ | ‘_ | ‘| | ‘ / ` | \ \ \ \ \/ )| |)| | | | | || (| | ) ) ) ) ’ || .__|| ||| |_, | / / / / =========||==============|/=//// :: Spring Boot :: (v2.1.2.RELEASE)2019-02-20 16:58:15.870 INFO 13392 — [ main] c.j.d.java8.Stream.CollectorsDetail : Starting CollectorsDetail on DESKTOP-87RMBG4 with PID 13392 (started by 46250 in E:\IdeaProjects\design)2019-02-20 16:58:15.871 INFO 13392 — [ main] c.j.d.java8.Stream.CollectorsDetail : No active profile set, falling back to default profiles: default2019-02-20 16:58:16.383 INFO 13392 — [ main] c.j.d.java8.Stream.CollectorsDetail : Started CollectorsDetail in 0.708 seconds (JVM running for 1.421)—————————————–[Kirito, Asuna, Sinon, Yuuki, Alice]—————————————[Yuuki, Asuna, Kirito, Sinon, Alice]—————————————[KIRITO, ASUNA, SINON, YUUKI, ALICE]—————————————[Kirito]—————————————{5=[Student(id=5, name=Alice, sex=Female, age=14, addr=Alicization, salary=9.99999999E8)], 4=[Student(id=4, name=Yuuki, sex=Female, age=15, addr=Alfheim Online, salary=9.99999999E8)], 3=[Student(id=3, name=Sinon, sex=Female, age=16, addr=Gun Gale Online, salary=9.99999999E8)], 2=[Student(id=2, name=Asuna, sex=Female, age=17, addr=Sword Art Online, salary=9.99999999E8)], 1=[Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8)]}—————————————{6=Kirito, 5=Asuna Sinon Yuuki Alice}——————————————————————————–{Female=[Student(id=2, name=Asuna, sex=Female, age=17, addr=Sword Art Online, salary=9.99999999E8), Student(id=3, name=Sinon, sex=Female, age=16, addr=Gun Gale Online, salary=9.99999999E8), Student(id=4, name=Yuuki, sex=Female, age=15, addr=Alfheim Online, salary=9.99999999E8), Student(id=5, name=Alice, sex=Female, age=14, addr=Alicization, salary=9.99999999E8)], Male=[Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8)]}—————————————–{Female=4, Male=1}—————————————–{Female=9.99999999E8, Male=9.99999999E8}—————————————–{Female={Alicization=[Student(id=5, name=Alice, sex=Female, age=14, addr=Alicization, salary=9.99999999E8)], Sword Art Online=[Student(id=2, name=Asuna, sex=Female, age=17, addr=Sword Art Online, salary=9.99999999E8)], Gun Gale Online=[Student(id=3, name=Sinon, sex=Female, age=16, addr=Gun Gale Online, salary=9.99999999E8)], Alfheim Online=[Student(id=4, name=Yuuki, sex=Female, age=15, addr=Alfheim Online, salary=9.99999999E8)]}, Male={Sword Art Online=[Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8)]}}—————————————–{Female=[Student(id=4, name=Yuuki, sex=Female, age=15, addr=Alfheim Online, salary=9.99999999E8), Student(id=3, name=Sinon, sex=Female, age=16, addr=Gun Gale Online, salary=9.99999999E8), Student(id=5, name=Alice, sex=Female, age=14, addr=Alicization, salary=9.99999999E8), Student(id=2, name=Asuna, sex=Female, age=17, addr=Sword Art Online, salary=9.99999999E8)], Male=[Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8)]}—————————————–{Female=[Student(id=4, name=Yuuki, sex=Female, age=15, addr=Alfheim Online, salary=9.99999999E8), Student(id=3, name=Sinon, sex=Female, age=16, addr=Gun Gale Online, salary=9.99999999E8), Student(id=5, name=Alice, sex=Female, age=14, addr=Alicization, salary=9.99999999E8), Student(id=2, name=Asuna, sex=Female, age=17, addr=Sword Art Online, salary=9.99999999E8)], Male=[Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8)]}———————————————————————————-{Male=[Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8)], Female=[Student(id=2, name=Asuna, sex=Female, age=17, addr=Sword Art Online, salary=9.99999999E8), Student(id=3, name=Sinon, sex=Female, age=16, addr=Gun Gale Online, salary=9.99999999E8), Student(id=4, name=Yuuki, sex=Female, age=15, addr=Alfheim Online, salary=9.99999999E8), Student(id=5, name=Alice, sex=Female, age=14, addr=Alicization, salary=9.99999999E8)]}—————————————–{Male=[Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8)], Female=[Student(id=4, name=Yuuki, sex=Female, age=15, addr=Alfheim Online, salary=9.99999999E8), Student(id=3, name=Sinon, sex=Female, age=16, addr=Gun Gale Online, salary=9.99999999E8), Student(id=5, name=Alice, sex=Female, age=14, addr=Alicization, salary=9.99999999E8), Student(id=2, name=Asuna, sex=Female, age=17, addr=Sword Art Online, salary=9.99999999E8)]}—————————————–{Female=[Student(id=4, name=Yuuki, sex=Female, age=15, addr=Alfheim Online, salary=9.99999999E8), Student(id=3, name=Sinon, sex=Female, age=16, addr=Gun Gale Online, salary=9.99999999E8), Student(id=5, name=Alice, sex=Female, age=14, addr=Alicization, salary=9.99999999E8), Student(id=2, name=Asuna, sex=Female, age=17, addr=Sword Art Online, salary=9.99999999E8)], Male=[Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8)]}———————————————————————————-{false=[Student(id=3, name=Sinon, sex=Female, age=16, addr=Gun Gale Online, salary=9.99999999E8), Student(id=4, name=Yuuki, sex=Female, age=15, addr=Alfheim Online, salary=9.99999999E8), Student(id=5, name=Alice, sex=Female, age=14, addr=Alicization, salary=9.99999999E8)], true=[Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8), Student(id=2, name=Asuna, sex=Female, age=17, addr=Sword Art Online, salary=9.99999999E8)]}—————————————–{false={false=[Student(id=3, name=Sinon, sex=Female, age=16, addr=Gun Gale Online, salary=9.99999999E8), Student(id=4, name=Yuuki, sex=Female, age=15, addr=Alfheim Online, salary=9.99999999E8), Student(id=5, name=Alice, sex=Female, age=14, addr=Alicization, salary=9.99999999E8)], true=[Student(id=2, name=Asuna, sex=Female, age=17, addr=Sword Art Online, salary=9.99999999E8)]}, true={false=[], true=[Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8)]}}—————————————–{false=[Student(id=4, name=Yuuki, sex=Female, age=15, addr=Alfheim Online, salary=9.99999999E8), Student(id=3, name=Sinon, sex=Female, age=16, addr=Gun Gale Online, salary=9.99999999E8), Student(id=5, name=Alice, sex=Female, age=14, addr=Alicization, salary=9.99999999E8)], true=[Student(id=2, name=Asuna, sex=Female, age=17, addr=Sword Art Online, salary=9.99999999E8), Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8)]}—————————————– ...

February 20, 2019 · 9 min · jiezi

Stream流与Lambda表达式(四) 自定义收集器

一、自定义SetCustomCollector收集器package com.java.design.Stream.CustomCollector;import java.util.*;import java.util.function.BiConsumer;import java.util.function.BinaryOperator;import java.util.function.Function;import java.util.function.Supplier;import java.util.stream.Collector;/** * @author 陈杨 /// 将List集合转换为Set集合 存放相同元素public class SetCustomCollector<T> implements Collector<T, Set<T>, Set<T>> { @Override public Supplier<Set<T>> supplier() { System.out.println(“supplier invoked!”); // return TreeSet::new; return HashSet::new; } @Override public BiConsumer<Set<T>, T> accumulator() { System.out.println(“accumulator invoked!”); return Set<T>::add; } @Override public BinaryOperator<Set<T>> combiner() { System.out.println(“combiner invoked!”); return (first, last) -> { first.addAll(last); return first; }; } @Override public Function<Set<T>, Set<T>> finisher() { System.out.println(“finisher invoked!”); return Function.identity(); } @Override public Set<Characteristics> characteristics() { System.out.println(“characteristics invoked!”); return Collections.unmodifiableSet(EnumSet.of (Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED)); // return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED)); }}二、自定义StudentCustomCollector收集器package com.java.design.Stream.CustomCollector;import com.java.design.java8.entity.Student;import java.util.;import java.util.function.BiConsumer;import java.util.function.BinaryOperator;import java.util.function.Function;import java.util.function.Supplier;import java.util.stream.Collector;/** * @author 陈杨 /// 将学生对象 按照HashMap<Integer,Student> 存放 sid studentpublic class StudentCustomCollector implements Collector<Student, List<Student>, Map<Integer, Student>> { @Override public Supplier<List<Student>> supplier() { System.out.println(“supplier invoked!”); return ArrayList::new; } @Override public BiConsumer<List<Student>, Student> accumulator() { System.out.println(“accumulator invoked!”); return (list, student) -> { System.out.println(“accumulator:” + Thread.currentThread().getName()); list.add(student); }; } @Override public BinaryOperator<List<Student>> combiner() { System.out.println(“combiner invoked!”); return (first, last) -> { first.addAll(last); return first; }; } @Override public Function<List<Student>, Map<Integer, Student>> finisher() { System.out.println(“finisher invoked!”); return list -> { Map<Integer, Student> map = new HashMap<>(); list.forEach(student -> map.put(student.getId(), student)); return map; }; } @Override public Set<Characteristics> characteristics() { System.out.println(“Characteristics invoked!”); return Collections.unmodifiableSet(EnumSet.of(Characteristics.CONCURRENT)); } // Characteristics.IDENTITY_FINISH 从中间容器数据类型 转换为 结果类型 数据类型一致 // 若不一致 抛出类型转换异常 finisher对中间容器数据–>结果类型 进行强制类型转换 // Characteristics.CONCURRENT 多个线程同时操作同一个容器 –> 并行 // Indicates that this collector is <em>concurrent</em>, meaning that // the result container can support the accumulator function being // called concurrently with the same result container from multiple threads. // parallelStream (多线程)并行流 操作 多个结果容器 –> 执行combiner // Characteristics.CONCURRENT + parallelStream 结果容器只有1个 —> 不执行 combiner // ConcurrentModificationException 并发修改异常 // 注意:并行情况下 累加器对结果容器执行单一操作 // 不要在累加器返回的函数式接口实例中做额外的操作 // 不能打印集合类容 同时向集合里添加新元素 // This exception may be thrown by methods that have detected concurrent // modification of an object when such modification is not permissible}三、SetCustomCollectorTest测试package com.java.design.java8.Stream.CustomCollector;import com.java.design.Stream.CustomCollector.SetCustomCollector;import com.java.design.java8.entity.Student;import com.java.design.java8.entity.Students;import org.junit.Before;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.util.List;import java.util.Set;/* * @author 陈杨 /@SpringBootTest@RunWith(SpringRunner.class)public class SetCustomCollectorTest { private List<Student> students; @Before public void init() { students = new Students().init(); } @Test public void testSetCustomCollector() { Set<Student> set = students.stream().collect(new SetCustomCollector<>()); System.out.println(set); } /public static <T, I> TerminalOp<T, I> makeRef(Collector<? super T, I, ?> collector) { Supplier<I> supplier = Objects.requireNonNull(collector).supplier(); BiConsumer<I, ? super T> accumulator = collector.accumulator(); BinaryOperator<I> combiner = collector.combiner(); class ReducingSink extends Box<I> implements AccumulatingSink<T, I, ReducingSink> { @Override public void begin(long size) { state = supplier.get(); } @Override public void accept(T t) { accumulator.accept(state, t); } @Override public void combine(ReducingSink other) { state = combiner.apply(state, other.state); } } return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) { @Override public ReducingSink makeSink() { return new ReducingSink(); } @Override public int getOpFlags() { return collector.characteristics().contains(Collector.Characteristics.UNORDERED) ? StreamOpFlag.NOT_ORDERED : 0; } }; }/ /public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) { A container; if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { container = collector.supplier().get(); BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator(); forEach(u -> accumulator.accept(container, u)); } else { container = evaluate(ReduceOps.makeRef(collector)); } return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container); }/ // 执行流程 方法调用顺序 // container = evaluate(ReduceOps.makeRef(collector)); // Supplier<I> supplier = Objects.requireNonNull(collector).supplier(); // BiConsumer<I, ? super T> accumulator = collector.accumulator(); // BinaryOperator<I> combiner = collector.combiner(); // return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)是否有序 // return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)是否包含IDENTITY_FINISH // ? (R) container 注意强制类型转换 (中间类型 与 返回结果类型) // 注意强制类型转换 /CollectorImpl(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner, Set<Characteristics> characteristics) { this(supplier, accumulator, combiner, castingIdentity(), characteristics); } @SuppressWarnings(“unchecked”) private static <I, R> Function<I, R> castingIdentity() { return i -> (R) i; }/ // EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED) // 包含 IDENTITY_FINISH 打印结果 // supplier invoked! // accumulator invoked! // combiner invoked! // characteristics invoked! // characteristics invoked! // Set<Student>集合对象 // EnumSet.of(Characteristics.UNORDERED) // 不包含 IDENTITY_FINISH 打印结果 // supplier invoked! // accumulator invoked! // combiner invoked! // characteristics invoked! // characteristics invoked! // finisher invoked! // Set<Student>集合对象}四、StudentCustomCollectorTest测试package com.java.design.java8.Stream.CustomCollector;import com.java.design.Stream.CustomCollector.StudentCustomCollector;import com.java.design.java8.entity.Student;import com.java.design.java8.entity.Students;import org.junit.Before;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.util.List;import java.util.Map;/* * @author 陈杨 */@SpringBootTest@RunWith(SpringRunner.class)public class StudentCustomCollectorTest { private List<Student> students; @Before public void init() { students = new Students().init(); } @Test public void testStudentCustomCollectorTest() { System.out.println(“单线程”); Map<Integer, Student> sequentialMap = students.stream().collect(new StudentCustomCollector()); System.out.println(“串行流执行效果:\n—————————————\n”+sequentialMap); System.out.println("—————————————\n"); System.out.println(“多线程”); Map<Integer, Student> parallelMap = students.parallelStream().collect(new StudentCustomCollector()); System.out.println(“并行流执行效果:\n—————————————\n”+parallelMap); System.out.println("—————————————\n"); }}五、测试结果 SetCustomCollectorTest测试结果 . ____ _ __ _ _ /\ / ’ __ _ () __ __ _ \ \ \ ( ( )__ | ‘_ | ‘| | ‘ / | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.1.2.RELEASE)2019-02-20 17:14:45.547 INFO 3260 --- [ main] c.j.d.j.S.C.SetCustomCollectorTest : Starting SetCustomCollectorTest on DESKTOP-87RMBG4 with PID 3260 (started by 46250 in E:\IdeaProjects\design)2019-02-20 17:14:45.548 INFO 3260 --- [ main] c.j.d.j.S.C.SetCustomCollectorTest : No active profile set, falling back to default profiles: default2019-02-20 17:14:46.055 INFO 3260 --- [ main] c.j.d.j.S.C.SetCustomCollectorTest : Started SetCustomCollectorTest in 0.686 seconds (JVM running for 1.43)supplier invoked!accumulator invoked!combiner invoked!characteristics invoked!characteristics invoked![Student(id=4, name=Yuuki, sex=Female, age=15, addr=Alfheim Online, salary=9.99999999E8), Student(id=3, name=Sinon, sex=Female, age=16, addr=Gun Gale Online, salary=9.99999999E8), Student(id=5, name=Alice, sex=Female, age=14, addr=Alicization, salary=9.99999999E8), Student(id=2, name=Asuna, sex=Female, age=17, addr=Sword Art Online, salary=9.99999999E8), Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8)] StudentCustomCollectorTest测试 . ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \( ( )\___ | '_ | '_| | '_ \/ _ | \ \ \ \ \/ )| |)| | | | | || (| | ) ) ) ) ’ || .__|| ||| |_, | / / / / =========||==============|/=//// :: Spring Boot :: (v2.1.2.RELEASE)2019-02-20 17:15:52.817 INFO 3292 — [ main] c.j.d.j.S.C.StudentCustomCollectorTest : Starting StudentCustomCollectorTest on DESKTOP-87RMBG4 with PID 3292 (started by 46250 in E:\IdeaProjects\design)2019-02-20 17:15:52.818 INFO 3292 — [ main] c.j.d.j.S.C.StudentCustomCollectorTest : No active profile set, falling back to default profiles: default2019-02-20 17:15:53.354 INFO 3292 — [ main] c.j.d.j.S.C.StudentCustomCollectorTest : Started StudentCustomCollectorTest in 0.745 seconds (JVM running for 1.439)单线程supplier invoked!accumulator invoked!combiner invoked!Characteristics invoked!accumulator:mainaccumulator:mainaccumulator:mainaccumulator:mainaccumulator:mainCharacteristics invoked!finisher invoked!串行流执行效果:—————————————{1=Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8), 2=Student(id=2, name=Asuna, sex=Female, age=17, addr=Sword Art Online, salary=9.99999999E8), 3=Student(id=3, name=Sinon, sex=Female, age=16, addr=Gun Gale Online, salary=9.99999999E8), 4=Student(id=4, name=Yuuki, sex=Female, age=15, addr=Alfheim Online, salary=9.99999999E8), 5=Student(id=5, name=Alice, sex=Female, age=14, addr=Alicization, salary=9.99999999E8)}—————————————多线程Characteristics invoked!Characteristics invoked!supplier invoked!accumulator invoked!combiner invoked!Characteristics invoked!accumulator:mainaccumulator:ForkJoinPool.commonPool-worker-5accumulator:ForkJoinPool.commonPool-worker-5accumulator:ForkJoinPool.commonPool-worker-3accumulator:mainCharacteristics invoked!finisher invoked!并行流执行效果:—————————————{1=Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8), 2=Student(id=2, name=Asuna, sex=Female, age=17, addr=Sword Art Online, salary=9.99999999E8), 3=Student(id=3, name=Sinon, sex=Female, age=16, addr=Gun Gale Online, salary=9.99999999E8), 4=Student(id=4, name=Yuuki, sex=Female, age=15, addr=Alfheim Online, salary=9.99999999E8), 5=Student(id=5, name=Alice, sex=Female, age=14, addr=Alicization, salary=9.99999999E8)}————————————— ...

February 20, 2019 · 5 min · jiezi

Stream流与Lambda表达式(一) 杂谈

一、流 转换为数组、集合package com.java.design.java8.Stream;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.util.ArrayList;import java.util.List;import java.util.stream.Stream;/** * @author 陈杨 /@SpringBootTest@RunWith(SpringRunner.class)public class ListChange { private Stream<String> stream = Stream.of(“Kirito”, “Asuna”, “Illyasviel”, “Sakura”); @Test public void testListChange() { // 将流转换为数组 // System.out.println("————-将流转换为数组—————"); // String[] array = stream.toArray(len -> new String[len]); // String[] array = stream.toArray(String[]::new); // Arrays.asList(array).stream().forEach(System.out::println); // 将流转换为集合 // System.out.println("————-将流转换为集合—————"); // System.out.println("——-Collectors.toList()解析———–"); / public static <T> * Collector<T, ?, List<T>> toList() { * return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add, * (left, right) -> { left.addAll(right); return left; }, * CH_ID); }/ // List<String> list = stream.collect(Collectors.toList()); // List<String> linkedList = stream.collect(LinkedList::new,LinkedList::add,LinkedList::addAll); List<String> list = stream.collect(ArrayList::new, ArrayList::add, ArrayList::addAll); list.forEach(System.out::println); System.out.println(list.getClass()); // System.out.println("——-Collectors.toCollection()解析—–"); / public static <T, C extends Collection<T>> * Collector<T, ?, C> toCollection(Supplier<C> collectionFactory) { * return new CollectorImpl<>(collectionFactory, Collection<T>::add, * (r1, r2) -> { r1.addAll(r2); return r1; }, * CH_ID); }*/ // List<String> list =stream.collect(Collectors.toCollection(ArrayList::new)); // List<String> linkedList =stream.collect(Collectors.toCollection(ArrayList::new)); // Set<String> treeSet =stream.collect(Collectors.toCollection(TreeSet::new)); // Set<String> hashSet =stream.collect(Collectors.toCollection(HashSet::new)); }} . ____ _ __ _ _ /\ / ’ __ _ () __ __ _ \ \ \ ( ( )__ | ‘_ | ‘| | ‘ / | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.1.2.RELEASE)2019-02-20 15:47:22.310 INFO 1348 --- [ main] com.java.design.java8.Stream.ListChange : Starting ListChange on DESKTOP-87RMBG4 with PID 1348 (started by 46250 in E:\IdeaProjects\design)2019-02-20 15:47:22.311 INFO 1348 --- [ main] com.java.design.java8.Stream.ListChange : No active profile set, falling back to default profiles: default2019-02-20 15:47:22.947 INFO 1348 --- [ main] com.java.design.java8.Stream.ListChange : Started ListChange in 0.914 seconds (JVM running for 1.774)KiritoAsunaIllyasvielSakuraclass java.util.ArrayList二、集合排序package com.java.design.java8.Stream;import org.junit.Before;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.util.*;/** * @author 陈杨 */@SpringBootTest@RunWith(SpringRunner.class)public class ComparatorDetail { private List&lt;String&gt; names; @Before public void init() { names = Arrays.asList("Kirito", "Asuna", "Sinon", "Yuuki", "Alice"); } public void println() { System.out.println(names); System.out.println("-----------------------------------------\n"); } @Test public void testComparatorDetail() { // 对名字进行升序排序 Collections.sort(names); this.println(); // 对名字进行降序排序 names.sort(Collections.reverseOrder()); this.println(); // 按照姓名的字符串长度升序排序 相同长度--&gt;比较前两个字符--&gt;按照字符的ASCII码大小升序排序 names.sort(Comparator.comparingInt(String::length) .thenComparing(str -&gt; str.charAt(0)) .thenComparing(str -&gt; str.charAt(1)) ); this.println(); // 按照姓名的字符串长度降序排序 相同长度--&gt;比较前两个字符--&gt;按照字符的ASCII码大小降序排序 names.sort(Comparator.comparingInt(String::length) .thenComparing(str -&gt; str.charAt(0)) .thenComparing(str -&gt; str.charAt(1)) .reversed()); this.println(); // 按照姓名的字符串长度降序排序 相同长度--&gt;按照字符的ASCII码大小排序(不区分大小写) names.sort(Comparator.comparingInt(String::length) .thenComparing(String.CASE_INSENSITIVE_ORDER)); this.println(); }} . ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \( ( )\___ | '_ | '_| | '_ \/ _ | \ \ \ \ \/ )| |)| | | | | || (| | ) ) ) ) ’ || .__|| ||| |_, | / / / / =========||==============|/=//// :: Spring Boot :: (v2.1.2.RELEASE)2019-02-20 15:58:39.959 INFO 4588 — [ main] c.j.d.java8.Stream.ComparatorDetail : Starting ComparatorDetail on DESKTOP-87RMBG4 with PID 4588 (started by 46250 in E:\IdeaProjects\design)2019-02-20 15:58:39.962 INFO 4588 — [ main] c.j.d.java8.Stream.ComparatorDetail : No active profile set, falling back to default profiles: default2019-02-20 15:58:40.459 INFO 4588 — [ main] c.j.d.java8.Stream.ComparatorDetail : Started ComparatorDetail in 0.729 seconds (JVM running for 1.462)[Alice, Asuna, Kirito, Sinon, Yuuki]—————————————–[Yuuki, Sinon, Kirito, Asuna, Alice]—————————————–[Alice, Asuna, Sinon, Yuuki, Kirito]—————————————–[Kirito, Yuuki, Sinon, Asuna, Alice]—————————————–[Alice, Asuna, Sinon, Yuuki, Kirito]—————————————–三、Stream之map(Lambda)package com.java.design.java8.Stream;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.util.Arrays;import java.util.Collections;import java.util.List;import java.util.stream.Collectors;/** * @author 陈杨 /@SpringBootTest@RunWith(SpringRunner.class)public class StringOperation { private List<String> list = Arrays.asList(“Kirito”, “Asuna”, “Illyasviel”, “Sakura”); private List<List<String>> listMap = Arrays.asList(Collections.singletonList(“Kirito”), Collections.singletonList(“Asuna”), Collections.singletonList(“Illyasviel”), Collections.singletonList(“Sakura”)); private List<List<String>> listFlatMap = Arrays.asList(Collections.singletonList(“Kirito”), Collections.singletonList(“Asuna”), Collections.singletonList(“Illyasviel”), Collections.singletonList(“Sakura”)); @Test public void testStringOperation() { // 集合中 每个字符串 按照排列先后顺序拼接 形成一个长字符串 // String concat = // list.stream().collect(StringBuilder::new, StringBuilder::append, StringBuilder::append).toString(); // String concat = list.stream().collect(Collectors.joining()); // System.out.println(concat); // 集合中 对每个字符串元素 将所有字母变成大写字母 System.out.println("—————————————–\n"); List<String> upperCase = list.stream().map(String::toUpperCase).collect(Collectors.toList()); upperCase.forEach(System.out::println); // 集合中 对每个字符串元素 将所有字母变成小写字母 System.out.println("—————————————–\n"); List<String> lowerCase = list.stream().map(String::toLowerCase).collect(Collectors.toList()); lowerCase.forEach(System.out::println); System.out.println("—————————————–\n"); System.out.println(“FlatMap与Map的区别:\n”); // map: 对多个list 分别map Fuction 映射 形成多个list System.out.println("—————————————–\n"); System.out.println(“进行map映射:”); List<List<String>> upperMap = listMap.stream() .map(list -> list.stream().map(String::toUpperCase) .collect(Collectors.toList())).collect(Collectors.toList()); upperMap.forEach(System.out::println); // FlatMap: 对多个list 进行flat扁平化 后再进行map Fuction 映射 形成一个list System.out.println("—————————————–\n"); System.out.println(“FlatMap扁平化进行map映射:”); List<String> upperFlatMap = listFlatMap.stream() .flatMap(list -> list.stream().map(String::toUpperCase)).collect(Collectors.toList()); upperFlatMap.forEach(System.out::println); }} . ____ _ __ _ _ /\ / ’ __ _ () __ __ _ \ \ \ ( ( )__ | ‘_ | ‘| | ‘ / | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.1.2.RELEASE)2019-02-20 15:50:07.423 INFO 8208 --- [ main] c.j.design.java8.Stream.StringOperation : Starting StringOperation on DESKTOP-87RMBG4 with PID 8208 (started by 46250 in E:\IdeaProjects\design)2019-02-20 15:50:07.424 INFO 8208 --- [ main] c.j.design.java8.Stream.StringOperation : No active profile set, falling back to default profiles: default2019-02-20 15:50:07.917 INFO 8208 --- [ main] c.j.design.java8.Stream.StringOperation : Started StringOperation in 0.717 seconds (JVM running for 1.5)-----------------------------------------KIRITOASUNAILLYASVIELSAKURA-----------------------------------------kiritoasunaillyasvielsakura-----------------------------------------FlatMap与Map的区别:-----------------------------------------进行map映射:[KIRITO][ASUNA][ILLYASVIEL][SAKURA]-----------------------------------------FlatMap扁平化进行map映射:KIRITOASUNAILLYASVIELSAKURA四、内部迭代与外部迭代package com.java.design.java8.Stream;import com.java.design.java8.entity.Student;import com.java.design.java8.entity.Students;import org.junit.Before;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.util.*;/** * @author 陈杨 */@SpringBootTest@RunWith(SpringRunner.class)//迭代本质public class IterativeEssence { private List&lt;Student&gt; students; @Before public void init() { students = new Students().init(); } @Test public void testIterativeEssence() { // 需求: select name from students where age &gt; 14 and addr ="Sword Art Online" order by id desc ; // 外部迭代 System.out.println("-----------------------------------------\n"); System.out.println("外部迭代"); List&lt;Student&gt; list = new ArrayList&lt;&gt;(); for (Student student : students) { if (student.getAge() &gt; 14 &amp;&amp; student.getAddr().equals("Sword Art Online")) { list.add(student); } } list.sort(Comparator.comparingInt(Student::getId)); for (Student student : list) { System.out.println(student.getName()); } // 内部迭代 System.out.println("-----------------------------------------\n"); System.out.println("内部迭代"); students.stream() .filter(student -&gt; student.getAge() &gt; 14) .filter(student -&gt; student.getAddr().equals("Sword Art Online")) .sorted(Comparator.comparingInt(Student::getId)). forEach(student -&gt; System.out.println(student.getName())); // 备注: // 内部迭代与SQL语句属于描述性语言 // 集合关注的是数据与数据存储 // 流关注的是数据的计算 // 流中间操作返回的都是Stream对象 泛型取决于中间操作的类型 // 流终止操作: 无返回值(forEach) 返回值是其他类型 }} . ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \( ( )\___ | '_ | '_| | '_ \/ _ | \ \ \ \ \/ )| |)| | | | | || (| | ) ) ) ) ’ || .__|| ||| |_, | / / / / =========||==============|/=//// :: Spring Boot :: (v2.1.2.RELEASE)2019-02-20 15:53:03.633 INFO 8864 — [ main] c.j.d.java8.Stream.IterativeEssence : Starting IterativeEssence on DESKTOP-87RMBG4 with PID 8864 (started by 46250 in E:\IdeaProjects\design)2019-02-20 15:53:03.640 INFO 8864 — [ main] c.j.d.java8.Stream.IterativeEssence : No active profile set, falling back to default profiles: default2019-02-20 15:53:04.167 INFO 8864 — [ main] c.j.d.java8.Stream.IterativeEssence : Started IterativeEssence in 0.746 seconds (JVM running for 1.455)—————————————–外部迭代KiritoAsuna—————————————–内部迭代KiritoAsuna五、串行流与并行流 简单性能测试package com.java.design.java8.Stream;import org.junit.Before;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import java.util.UUID;import java.util.concurrent.TimeUnit;import java.util.function.Function;import java.util.stream.Collectors;import java.util.stream.IntStream;/* * @author 陈杨 /@RunWith(SpringRunner.class)@SpringBootTestpublic class ErgodicString { private List<String> uuid; private long startTime; private long endTime; private long parallelEndTime; @Before public void init() { uuid = new ArrayList<>(10000000); IntStream.range(0, 10000000).forEach(i -> uuid.add(UUID.randomUUID().toString())); } public void testNormal() { startTime = System.nanoTime(); uuid.stream().sorted().collect(Collectors.toList()); endTime = System.nanoTime(); long millis = TimeUnit.NANOSECONDS.toMillis(endTime - startTime); System.out.println(“单线程” + millis); } public void testParallel() { startTime = System.nanoTime(); uuid.parallelStream().sorted().collect(Collectors.toList()); parallelEndTime = System.nanoTime(); long millis = TimeUnit.NANOSECONDS.toMillis(parallelEndTime - startTime); System.out.println(“多线程” + millis); } @Test public void testErgodicString() { List<String> list = Arrays.asList(“Kirito”, “Asuna”, “Illyasviel”, “Sakura”); // 需求: 将数组中每个元素各个字母大写 并放入集合 System.out.println("——————–串行流stream———————"); // spliterator 分割迭代器 // 串行流stream() 单线程处理 / * default Stream<E> stream() { * return StreamSupport.stream(spliterator(), false);} / // List<String> collect = list.stream().map(str -> str.toUpperCase()).collect(Collectors.toList()); // R apply(T t); toUpperCase() 没有参数作为输入 // 但把调用toUpperCase的对象作为T作为输入 返回的R是return的对象结果 // List<String> collect = list.stream().map(String::toUpperCase).collect(Collectors.toList()); Function<String, String> function = String::toUpperCase; //等价于 (String str) -> str.toUpperCase() //方法引用 类的类型::实例方法 对应的lambda表达式 第一个输入参数 是调用此方法的对象 List<String> collect = list.stream().map(function).collect(Collectors.toList()); collect.forEach(System.out::println); this.testNormal(); System.out.println("—————–并行流parallelStream——————"); // 并行流parallelStream() 多线程处理 / * default Stream<E> parallelStream() { * return StreamSupport.stream(spliterator(), true);} */ List<String> parallelCollect = list.parallelStream().map(str -> str.toUpperCase()).collect(Collectors.toList()); parallelCollect.forEach(System.out::println); this.testParallel(); }} . ____ _ __ _ _ /\ / ’ __ _ () __ __ _ \ \ \ ( ( )__ | ‘_ | ‘| | ‘ / ` | \ \ \ \ \/ )| |)| | | | | || (| | ) ) ) ) ’ || .__|| ||| |_, | / / / / =========||==============|/=//// :: Spring Boot :: (v2.1.2.RELEASE)2019-02-20 15:54:54.321 INFO 7356 — [ main] c.j.design.java8.Stream.ErgodicString : Starting ErgodicString on DESKTOP-87RMBG4 with PID 7356 (started by 46250 in E:\IdeaProjects\design)2019-02-20 15:54:54.323 INFO 7356 — [ main] c.j.design.java8.Stream.ErgodicString : No active profile set, falling back to default profiles: default2019-02-20 15:54:54.817 INFO 7356 — [ main] c.j.design.java8.Stream.ErgodicString : Started ErgodicString in 0.705 seconds (JVM running for 1.528)——————–串行流stream———————KIRITOASUNAILLYASVIELSAKURA单线程10590—————–并行流parallelStream——————KIRITOASUNAILLYASVIELSAKURA多线程3313 ...

February 20, 2019 · 7 min · jiezi

Stream流与Lambda表达式(二) Stream收集器 Collector接口

一、Stream收集器 Collector接口package com.java.design.java8.Stream;import com.java.design.java8.entity.Student;import com.java.design.java8.entity.Students;import org.junit.Before;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.util.;import java.util.stream.Collectors;/** * @author 陈杨 /@SpringBootTest@RunWith(SpringRunner.class)public class CollectorDetail { private List<Student> students; @Before public void init() { students=new Students().init(); } @Test public void testCollectorDetail() { // Collect 收集器 —- Collector接口 // T–>汇聚操作的元素类型 即流中元素类型 // A–>汇聚操作的可变累积类型 // R–>汇聚操作的结果类型 // public interface Collector<T, A, R> // Collector接口 一种可变汇聚操作 // 将输入元素累积到可变结果容器中 // 在处理完所有输入元素后 可以选择将累积的结果转换为最终表示(可选操作) // 归约操作支持串行与并行 // A mutable reduction operation that accumulates input elements into a mutable result container, // optionally transforming the accumulated result into a final representation after all input elements // have been processed. Reduction operations can be performed either sequentially or in parallel. // Collectors 提供 Collector 汇聚实现 实际上是一个Collector工厂 // The class {@link Collectors} provides implementations of many common mutable reductions.二、Collector 接口组成 // Collector 由以下4个函数协同累积到容器 可选的执行最终转换 // supplier 创建一个新的结果容器 // accumulator累加器 将新数据元素合并到结果容器中 // combiner 合并结果容器 处理线程并发 // finisher 对容器执行可选的最终转换 // // A {@code Collector} is specified by four functions that work together to // accumulate entries into a mutable result container, and optionally perform // a final transform on the result. They are: // creation of a new result container ({@link #supplier()}) // incorporating a new data element into a result container ({@link #accumulator()}) // combining two result containers into one ({@link #combiner()}) // performing an optional final transform on the container ({@link #finisher()})三、combiner / * A function that accepts two partial results and merges them. The * combiner function may fold state from one argument into the other and * return that, or may return a new result container. * * * BinaryOperator<A> combiner(); / / supplier创建单个结果容器–>accumulator调用累积功能–>partition结果–分区容器–>combiner合并分区容器 A sequential implementation of a reduction using a collector would create a single result container using the supplier function, and invoke the accumulator function once for each input element. A parallel implementation would partition the input, create a result container for each partition, accumulate the contents of each partition into a subresult for that partition, and then use the combiner function to merge the subresults into a combined result. /四、identity associativity 约束/ 确保串行与并行结果的一致性,满足约束: identity associativity To ensure that sequential and parallel executions produce equivalent results, the collector functions must satisfy an identity and an associativity constraints. // identity 约束: 对于任何部分累积的结果, 将其与空结果容器组合必须生成等效的结果 a == combiner.apply(a, supplier.get()) The identity constraint says that for any partially accumulated result, combining it with an empty result container must produce an equivalent result. That is, for a partially accumulated result {@code a} that is the result of any series of accumulator and combiner invocations, {@code a} must be equivalent to {@code combiner.apply(a, supplier.get())}. // associativity 约束: 串行计算与并行拆分计算必须产生同等的结果 The associativity constraint says that splitting the computation must produce an equivalent result. That is, for any input elements {@code t1} and {@code t2}, the results {@code r1} and {@code r2} in the computation below must be equivalent: A a1 = supplier.get(); accumulator.accept(a1, t1); accumulator.accept(a1, t2); R r1 = finisher.apply(a1); // result without splitting A a2 = supplier.get(); accumulator.accept(a2, t1); A a3 = supplier.get(); accumulator.accept(a3, t2); R r2 = finisher.apply(combiner.apply(a2, a3)); // result with splitting /五、reduction 汇聚 的实现方式 // reduction 汇聚 的实现方式 // list.stream().reduce() 对象不可变 // list.stream().collect(Collectors.reducing()) 对象可变 // 单线程可以实现结果一致 但在多线程中就会出现错误 / Libraries that implement reduction based on {@code Collector}, such as {@link Stream#collect(Collector)}, must adhere to the following constraints: 传递给accumulator的第一个参数,传递给combiner的二个参数,传递给finisher的参数 必须是函数(supplier accumulator combiner)上一次调用结果 理解: 参数类型A Supplier<A> supplier(); BiConsumer<A, T> accumulator(); BinaryOperator<A> combiner(); Function<A, R> finisher(); The first argument passed to the accumulator function, both arguments passed to the combiner function, and the argument passed to the finisher function must be the result of a previous invocation of the result supplier, accumulator, or combiner functions supplier accumulator combiner的实现结果–> 传递给下一次supplier accumulator combiner操作 或返还给汇聚操作的调用方 而不进行其他操作 The implementation should not do anything with the result of any of the result supplier, accumulator, or combiner functions other than to pass them again to the accumulator, combiner, or finisher functions, or return them to the caller of the reduction operation 一个结果传递给combiner finisher而相同的对象没有从此函数中返回 这个结果不会再被使用 这个传入结果是被消费了 生成了新的对象 If a result is passed to the combiner or finisher function, and the same object is not returned from that function, it is never used again 一旦结果传递给combiner finisher 则不再会传递给accumulator 说明流中元素已经传递完全 accumulator任务已执行完毕 Once a result is passed to the combiner or finisher function, it is never passed to the accumulator function again 非并发单线程 For non-concurrent collectors, any result returned from the result supplier, accumulator, or combiner functions must be serially thread-confined. This enables collection to occur in parallel without the {@code Collector} needing to implement any additional synchronization. The reduction implementation must manage that the input is properly partitioned, that partitions are processed in isolation, and combining happens only after accumulation is complete 并发多线程 For concurrent collectors, an implementation is free to (but not required to) implement reduction concurrently. A concurrent reduction is one where the accumulator function is called concurrently from multiple threads, using the same concurrently-modifiable result container, rather than keeping the result isolated during accumulation. A concurrent reduction should only be applied if the collector has the {@link Characteristics#UNORDERED} characteristics or if the originating data is unordered /六、Characteristics对Collectors的性能优化 / Characteristics对Collectors的性能优化 * * Collectors also have a set of characteristics, that provide hints that can be used by a * reduction implementation to provide better performance. * * * Characteristics indicating properties of a {@code Collector}, which can * be used to optimize reduction implementations. * * enum Characteristics { * * Indicates that this collector is <em>concurrent</em>, meaning that * the result container can support the accumulator function being * called concurrently with the same result container from multiple * threads. * * If a {@code CONCURRENT} collector is not also {@code UNORDERED}, * then it should only be evaluated concurrently if applied to an * unordered data source. CONCURRENT, 多线程处理并发 一定要保证线程安全 使用无序数据源 与UNORDERED联合使用 * Indicates that the collection operation does not commit to preserving * the encounter order of input elements. (This might be true if the * result container has no intrinsic order, such as a {@link Set}.) UNORDERED, 无序集合 * Indicates that the finisher function is the identity function and * can be elided. If set, it must be the case that an unchecked cast * from A to R will succeed. IDENTITY_FINISH 强制类型转换 }/七、Collector接口与 Collectors // Collectors—> Collector接口简单实现 静态内部类CollectorImpl // 为什么要在Collectors类内部定义一个静态内部类CollectorImpl: // Collectors是一个工厂、辅助类 方法的定义是静态的 // 以类名直接调用方法的方式向developer提供最常见的Collector实现 其实现方式是CollectorImpl // CollectorImpl类 有且仅有在 Collectors类 中使用 所以放在一起八、测试方法: // Accumulate names into a List 将学生姓名累积成ArrayList集合 List<String> snameList = students.stream() .map(Student::getName).collect(Collectors.toList()); System.out.println(“将学生姓名累积成ArrayList集合:” + snameList.getClass()); System.out.println(snameList); System.out.println("—————————————–\n"); // Accumulate names into a TreeSet 将学生姓名累积成TreeSet集合 Set<String> snameTree = students.stream() .map(Student::getName).collect(Collectors.toCollection(TreeSet::new)); System.out.println(“将学生姓名累积成TreeSet集合:” + snameTree.getClass()); System.out.println(snameTree); System.out.println("—————————————–\n"); // Convert elements to strings and concatenate them, separated by commas 将学生姓名累积成一个Json串 以逗号分隔 String joinedStudents = students.stream() .map(Student::toString).collect(Collectors.joining(",")); System.out.println(" 将学生姓名累积成一个Json串 以逗号分隔:" + joinedStudents); System.out.println("—————————————–\n"); // Compute sum of salaries of students 求学生总薪水 double totalSalary = students.stream() .mapToDouble(Student::getSalary).sum(); System.out.println(“学生总薪水:” + totalSalary); System.out.println("—————————————–\n"); // the min id of students 打印最小id的学生信息 System.out.println(“最小id的学生信息:”); students.stream() .min(Comparator.comparingInt(Student::getId)) .ifPresent(System.out::println); System.out.println("—————————————–\n"); // the max id of students 打印最大id的学生信息 System.out.println(“最大id的学生信息:”); students.stream() .max(Comparator.comparingInt(Student::getId)) .ifPresent(System.out::println); System.out.println("—————————————–\n"); // Compute avg of Age of students 求学生平均年龄 Double avgAge = students.stream() .collect(Collectors.averagingInt(Student::getAge)); System.out.println(“学生平均年龄:” + avgAge); System.out.println("—————————————–\n"); // Compute SummaryStatistics of Age of students 打印学生年龄的汇总信息 IntSummaryStatistics ageSummaryStatistics = students.stream() .collect(Collectors.summarizingInt(Student::getAge)); System.out.println(“学生年龄的汇总信息:” + ageSummaryStatistics); System.out.println("—————————————–\n"); // 根据性别分组 取id最小的学生 // 直接使用Collectors.minBy返回的是Optional<Student> // 因能确认不为Null 使用Collectors.collectingAndThen–>Optional::get获取 Map<String, Student> minIdStudent = students.stream(). collect(Collectors.groupingBy(Student::getSex, Collectors.collectingAndThen (Collectors.minBy(Comparator.comparingInt(Student::getId)), Optional::get))); System.out.println(minIdStudent); System.out.println("—————————————–\n"); }}九、测试结果 . ____ _ __ _ _ /\ / ’ __ _ () __ __ _ \ \ \ ( ( )__ | ‘_ | ‘| | ‘ / ` | \ \ \ \ \/ )| |)| | | | | || (| | ) ) ) ) ’ || .__|| ||| |_, | / / / / =========||==============|/=//// :: Spring Boot :: (v2.1.2.RELEASE)2019-02-20 16:11:56.217 INFO 17260 — [ main] c.j.design.java8.Stream.CollectorDetail : Starting CollectorDetail on DESKTOP-87RMBG4 with PID 17260 (started by 46250 in E:\IdeaProjects\design)2019-02-20 16:11:56.223 INFO 17260 — [ main] c.j.design.java8.Stream.CollectorDetail : No active profile set, falling back to default profiles: default2019-02-20 16:11:56.699 INFO 17260 — [ main] c.j.design.java8.Stream.CollectorDetail : Started CollectorDetail in 0.678 seconds (JVM running for 1.401)—————————————–将学生姓名累积成ArrayList集合:class java.util.ArrayList[Kirito, Asuna, Sinon, Yuuki, Alice]—————————————–将学生姓名累积成TreeSet集合:class java.util.TreeSet[Alice, Asuna, Kirito, Sinon, Yuuki]—————————————– 将学生姓名累积成一个Json串 以逗号分隔:Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8),Student(id=2, name=Asuna, sex=Female, age=17, addr=Sword Art Online, salary=9.99999999E8),Student(id=3, name=Sinon, sex=Female, age=16, addr=Gun Gale Online, salary=9.99999999E8),Student(id=4, name=Yuuki, sex=Female, age=15, addr=Alfheim Online, salary=9.99999999E8),Student(id=5, name=Alice, sex=Female, age=14, addr=Alicization, salary=9.99999999E8)—————————————–学生总薪水:4.999999995E9—————————————–最小id的学生信息:Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8)—————————————–最大id的学生信息:Student(id=5, name=Alice, sex=Female, age=14, addr=Alicization, salary=9.99999999E8)—————————————–学生平均年龄:16.0—————————————–学生年龄的汇总信息:IntSummaryStatistics{count=5, sum=80, min=14, average=16.000000, max=18}—————————————–{Female=Student(id=2, name=Asuna, sex=Female, age=17, addr=Sword Art Online, salary=9.99999999E8), Male=Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8)}—————————————– ...

February 20, 2019 · 7 min · jiezi

【kafka KSQL】游戏日志统计分析(3)

接上篇文章 【kafka KSQL】游戏日志统计分析(2),本文主要通过实例展示KSQL的连接查询功能。创建另一个topicbin/kafka-topics –create –zookeeper localhost:2181 –replication-factor 1 –partitions 4 –topic prop-normalized往新topic中写入数据bin/kafka-console-producer –broker-list localhost:9092 –topic prop-normalized>{“user__name”:“lzb”, “prop__id”:“id1”}从prop-normalized主题创建StreamCREATE STREAM PROP_USE_EVENT \ (user__name VARCHAR, \ prop__id VARCHAR ) \ WITH (KAFKA_TOPIC=‘prop-normalized’, \ VALUE_FORMAT=‘json’);重新设置ROWKEY为user__nameCREATE STREAM PROP_USE_EVENT_REKEY AS \ SELECT * FROM PROP_USE_EVENT \ PARTITION BY user__name;查询完成3局对局且没有使用过道具的所有玩家查询出所有玩家的对局情况,并创建表USER_SCORE_TABLE(前面已经创建过了):CREATE TABLE USER_SCORE_TABLE AS \ SELECT username, COUNT() AS game_count, SUM(delta) AS delta_sum, SUM(tax) AS tax_sum \ FROM USER_SCORE_EVENT \ WHERE reason = ‘game’ \ GROUP BY username;查询出所有玩家的道具使用情况,并创建表USER_PROP_TABLE:CREATE TABLE USER_PROP_TABLE AS \ SELECT username, COUNT() \ FROM PROP_USE_EVENT_REKEY \ GROUP BY username;使用LEFT JOIN进行左关联查询:SELECT s.username AS username \FROM USER_SCORE_TABLE s \LEFT JOIN USER_PROP_TABLE p \ON s.username = p.username; ...

January 9, 2019 · 1 min · jiezi

【kafka KSQL】游戏日志统计分析(2)

接上一篇文章【kafka KSQL】游戏日志统计分析(1),展示一下KSQL WINDOW 功能的使用。测试用日志数据:{“cost”:7, “epoch”:1512342568296,“gameId”:“2017-12-04_07:09:28_高手2区_500_015_185175”,“gameType”:“situan”,“gamers”: [{“balance”:4405682,“delta”:-60,“username”:“lza”}, {“balance”:69532,“delta”:-60,“username”:“lzb”}, {“balance”:972120,“delta”:-60,“username”:“lzc”}, {“balance”:23129,“delta”:180,“username”:“lze”}],“reason”:“game”}KSQL三种Window统计每2分钟内完成对局大于等于3局的玩家根据时间窗口(Tumbling window)建立table:CREATE TABLE users_per_minute AS \ SELECT username, COUNT() AS game_count, SUM(delta) AS delta_sum, SUM(tax) AS tax_sum , WINDOWSTART() AS win_start, WINDOWEND() AS win_end \ FROM USER_SCORE_EVENT \ WINDOW TUMBLING (SIZE 2 MINUTE) \ WHERE reason = ‘game’ \ GROUP BY username;过滤出game_count大于3局的玩家:SELECT username, game_count, win_start, win_end FROM users_per_minute WHERE game_count >= 3;输出:lze | 6 | 1546744320000 | 1546744440000lzc | 6 | 1546744320000 | 1546744440000lza | 6 | 1546744320000 | 1546744440000lzb | 6 | 1546744320000 | 1546744440000lzb | 3 | 1546744440000 | 1546744560000lzc | 3 | 1546744440000 | 1546744560000lza | 3 | 1546744440000 | 1546744560000lze | 3 | 1546744440000 | 1546744560000统计曾在10分钟之内完成过3局牌局的玩家不限定某个特定的10分钟,只要在某个10分钟之内完成了即可。创建HOPPING WINDOW时间窗口Table:CREATE TABLE users_hopping_10_minute AS \ SELECT username, COUNT() AS game_count, SUM(delta) AS delta_sum, SUM(tax) AS tax_sum , TIMESTAMPTOSTRING(WindowStart(), ‘yyyy-MM-dd HH:mm:ss’) AS win_start, TIMESTAMPTOSTRING(WindowEnd(), ‘yyyy-MM-dd HH:mm:ss’) AS win_end \ FROM USER_SCORE_EVENT \ WINDOW HOPPING (SIZE 10 MINUTE, ADVANCE BY 30 SECONDS) \ WHERE reason = ‘game’ \ GROUP BY username;过滤出game_count大于等于3的玩家SELECT username \FROM users_hopping_10_minute \WHERE game_count >= 3 \GROUP BY username; ...

January 6, 2019 · 1 min · jiezi

【kafka KSQL】游戏日志统计分析(1)

【kafka KSQL】游戏日志统计分析(1)以游戏结算日志为例,展示利用KSQL对日志进行统计分析的过程。启动confluentcd ~/Documents/install/confluent-5.0.1/bin/confluent start查看kafka主题列表bin/kafka-topics –list –zookeeper localhost:2181创建接受游戏结算日志的topicbin/kafka-topics –create –zookeeper localhost:2181 –replication-factor 1 –partitions 4 –topic score-normalized使用生产者命令行工具往topic中写日志bin/kafka-console-producer –broker-list localhost:9092 –topic score-normalized> {“cost”:7, “epoch”:1512342568296,“gameId”:“2017-12-04_07:09:28_高手1区_200_015_185175”,“gameType”:“situan”,“gamers”: [{“balance”:4405682,“delta”:-60,“username”:“0791754000”}, {“balance”:69532,“delta”:-60,“username”:“70837999”}, {“balance”:972120,“delta”:-60,“username”:“abc6378303”}, {“balance”:23129,“delta”:180,“username”:“a137671268”}],“reason”:“xiayu”}使用消费者命令行工具查看日志是否正常写入bin/kafka-console-consumer –bootstrap-server localhost:9092 –topic score-normalized –from-beginning;; 可以看到{“cost”:7, “epoch”:1512342568296,“gameId”:“2017-12-04_07:09:28_高手1区_200_015_185175”,“gameType”:“situan”,“gamers”: [{“balance”:4405682,“delta”:-60,“username”:“0791754000”}, {“balance”:69532,“delta”:-60,“username”:“70837999”}, {“balance”:972120,“delta”:-60,“username”:“abc6378303”}, {“balance”:23129,“delta”:180,“username”:“a137671268”}],“reason”:“xiayu”}启动KSQL客户端bin/ksql http://localhost:8088可以看到ksql启动后的图标,和操作终端。ksql终端查看kafka topic列表ksql> show topics;打印topic中的消息PRINT ‘score-normalized’;可以看到:Format:STRING19-1-5 下午11时59分31秒 , NULL , {“cost”:7, “epoch”:1512342568296,“gameId”:“2017-12-04_07:09:28_\xE9\xAB\x98\xE6\x89\x8B1\xE5\x8C\xBA_200_015_185175”,“gameType”:“situan”,“gamers”: [{“balance”:4405682,“delta”:-60,“username”:“0791754000”}, {“balance”:69532,“delta”:-60,“username”:“70837999”}, {“balance”:972120,“delta”:-60,“username”:“abc6378303”}, {“balance”:23129,“delta”:180,“username”:“a137671268”}],“reason”:“xiayu”}其中:第一个逗号19-1-5 下午11时59分31秒表示消息时间。第二个逗号NULL为消息的Key,因为是从kafka-console-producer推送的,默认为NULL。后面的就是推送过来的消息内容。从topic score-normalized创建一个StreamCREATE STREAM SCORE_EVENT \ (epoch BIGINT, \ gameType VARCHAR, \ cost INTEGER, \ gamers ARRAY< \ STRUCT< \ username VARCHAR, \ balance BIGINT, \ delta BIGINT \ > \ >, \ gameId VARCHAR, \ tax BIGINT, \ reason VARCHAR) \ WITH ( KAFKA_TOPIC=‘score-normalized’, \ VALUE_FORMAT=‘JSON’);删除一个STREAMDROP STREAM stream_name ;如果有查询语句在查询该流,则会出现错误:Cannot drop USER_SCORE_EVENT. The following queries read from this source: []. The following queries write into this source: [CSAS_USER_SCORE_EVENT_2, InsertQuery_4, InsertQuery_5, InsertQuery_3]. You need to terminate them before dropping USER_SCORE_EVENT.需要用TERMINATE命令停止这些查询语句,然后再删除流:TERMINATE CSAS_USER_SCORE_EVENT_2;TERMINATE InsertQuery_4;从最早记录开始查询ksql> SET ‘auto.offset.reset’ = ’earliest’;从Stream中查询所有数据ksql> SELECT * FROM SCORE_EVENT;可以看到:1546702389664 | null | 1512342568296 | situan | 7 | [{USERNAME=0791754000, BALANCE=4405682, DELTA=-60}, {USERNAME=70837999, BALANCE=69532, DELTA=-60}, {USERNAME=abc6378303, BALANCE=972120, DELTA=-60}, {USERNAME=a137671268, BALANCE=23129, DELTA=180}] | 2017-12-04_07:09:28_高手1区_200_015_185175 | null | xiayu其中:第1列为记录的时间戳。第2列为记录的key。第3列以后就是消息中的各个字段的值,对应创建流时的顺序。倒数第2列的null,是因为消息中tax字段不存在。统计2017-12-04日的对局总数;; 增加一个game_date字段,用于统计CREATE STREAM SCORE_EVENT_WITH_DATE AS \ SELECT SUBSTRING(gameId, 0, 10) AS game_date, * \ FROM SCORE_EVENT; SELECT game_date, COUNT() \ FROM SCORE_EVENT_WITH_DATE \ WHERE game_date = ‘2017-12-04’ AND reason = ‘game’ \ GROUP BY game_date;目前KSQL还不支持类似下面的查询:SELECT COUNT() \ FROM SCORE_EVENT \ WHERE gameId LIKE ‘2017-12-04_%’;统计参与对局的总玩家数(去重)因为一条日志中包含多个玩家的对局信息,所以想法把每个玩家拆分成单独的事件整合各个玩家的事件到一个统一的流USER_SCORE_EVENT:CREATE STREAM USER_SCORE_EVENT AS \ SELECT epoch, gameType, cost, gameId, tax, reason, gamers[0]->username AS username, gamers[0]->balance AS balance, gamers[0]->delta AS delta \ FROM SCORE_EVENT; INSERT INTO USER_SCORE_EVENT \ SELECT epoch, gameType, cost, gameId, tax, reason, gamers[1]->username AS username, gamers[1]->balance AS balance, gamers[1]->delta AS delta \ FROM SCORE_EVENT; INSERT INTO USER_SCORE_EVENT \ SELECT epoch, gameType, cost, gameId, tax, reason, gamers[2]->username AS username, gamers[2]->balance AS balance, gamers[2]->delta AS delta \ FROM SCORE_EVENT; INSERT INTO USER_SCORE_EVENT \ SELECT epoch, gameType, cost, gameId, tax, reason, gamers[3]->username AS username, gamers[3]->balance AS balance, gamers[3]->delta AS delta \ FROM SCORE_EVENT;统计各个玩家总的对局数、输赢总数、贡献的总税收,并以此创建一个表USER_SCORE_TABLE:CREATE TABLE USER_SCORE_TABLE AS \ SELECT username, COUNT(*) AS game_count, SUM(delta) AS delta_sum, SUM(tax) AS tax_sum \ FROM USER_SCORE_EVENT \ WHERE reason = ‘game’ \ GROUP BY username;查看USER_SCORE_TABLE所有数据:ksql> SELECT * FROM USER_SCORE_TABLE;1546709338711 | 70837999 | 70837999 | 4 | -240 | 01546709352758 | 0791754000 | 0791754000 | 4 | -240 | 01546709338711 | a137671268 | a137671268 | 4 | 720 | 01546709352758 | abc6378303 | abc6378303 | 4 | -240 | 0查询某个玩家的对局数、输赢总数、贡献的总税收:ksql> SELECT * FROM USER_SCORE_TABLE WHERE username = ‘70837999’;输出:1546709338711 | 70837999 | 70837999 | 4 | -240 | 0统计玩家总数(去重)添加一个傀儡列用于统计:CREATE TABLE USER_SCORE_WITH_TAG AS \ SELECT 1 AS tag, * FROM USER_SCORE_TABLE;统计去重后的玩家总数SELECT tag, COUNT(username) \FROM USER_SCORE_WITH_TAG \GROUP BY tag;续KSQL WINDOW 功能。 ...

January 6, 2019 · 2 min · jiezi

Java 并发方案全面学习总结

并发与并行的概念并发(Concurrency): 问题域中的概念—— 程序需要被设计成能够处理多个同时(或者几乎同时)发生的事件并行(Parallelism): 方法域中的概念——通过将问题中的多个部分 并行执行,来加速解决问题。进程、线程与协程它们都是并行机制的解决方案。进程: 进程是什么呢?直白地讲,进程就是应用程序的启动实例。比如我们运行一个游戏,打开一个软件,就是开启了一个进程。进程拥有代码和打开的文件资源、数据资源、独立的内存空间。启动一个进程非常消耗资源,一般一台机器最多启动数百个进程。线程: 线程从属于进程,是程序的实际执行者。一个进程至少包含一个主线程,也可以有更多的子线程。线程拥有自己的栈空间。在进程内启动线程也要消耗一定的资源,一般一个进程最多启动数千个线程。操作系统能够调度的最小单位就是线程了。协程: 协程又从属于线程,它不属于操作系统管辖,完全由程序控制,一个线程内可以启动数万甚至数百万协程。但也正是因为它由程序控制,它对编写代码的风格改变也最多。Java的并行执行实现JVM中的线程主线程: 独立生命周期的线程守护线程: 被主线程创建,随着创建线程结束而结束线程状态要注意的是,线程不是调用start之后马上进入运行中的状态,而是在"可运行"状态,由操作系统来决定调度哪个线程来运行。Jetty中的线程Web服务器都有自己管理的线程池, 比如轻量级的Jetty, 就有以下三种类型的线程:AcceptorSelectorWorker最原始的多线程——Thread类继承类 vs 实现接口继承Thread类实现Runnable接口实际使用中显然实现接口更好, 避免了单继承限制。Runnable vs CallableRunnable:实现run方法,无法抛出受检查的异常,运行时异常会中断主线程,但主线程无法捕获,所以子线程应该自己处理所有异常Callable:实现call方法,可以抛出受检查的异常,可以被主线程捕获,但主线程无法捕获运行时异常,也不会被打断。需要返回值的话,就用Callable接口一个实现了Callable接口的对象,需要被包装为RunnableFuture对象, 然后才能被新线程执行, 而RunnableFuture其实还是实现了Runnable接口。Future, Runnable 和FutureTask的关系如下:可以看出FutureTask其实是RunnableFuture接口的实现类,下面是使用Future的示例代码public class Callee implements Callable { AtomicInteger counter = new AtomicInteger(0); private Integer seq=null; public Callee() { super(); } public Callee(int seq) { this.seq = seq; } /** * call接口可以抛出受检查的异常 * @return * @throws InterruptedException / @Override public Person call() throws InterruptedException { Person p = new Person(“person”+ counter.incrementAndGet(), RandomUtil.random(0,150)); System.out.println(“In thread("+seq+”), create a Person: “+p.toString()); Thread.sleep(1000); return p; }}Callee callee1 = new Callee();FutureTask<Person> ft= new FutureTask<Person>(callee1);Thread thread = new Thread(ft);thread.start();try { thread.join();} catch (InterruptedException e) { e.printStackTrace(); return;}System.out.println(“ft.isDone: “+ft.isDone());Person result1;try { result1 = ((Future<Person>) ft).get();} catch (InterruptedException e) { e.printStackTrace(); result1 = null;} catch (ExecutionException e) { e.printStackTrace(); result1 = null;}Person result = result1;System.out.println(“main thread get result: “+result.toString());线程调度Thread.yield() 方法:调用这个方法,会让当前线程退回到可运行状态,而不是阻塞状态,这样就留给其他同级线程一些运行机会Thread.sleep(long millis):调用这个方法,真的会让当前线程进入阻塞状态,直到时间结束线程对象的join():这个方法让当前线程进入阻塞状态,直到要等待的线程结束。线程对象的interrupt():不要以为它是中断某个线程!它只是线线程发送一个中断信号,让线程在无限等待时(如死锁时)能抛出异常,从而结束线程,但是如果你吃掉了这个异常,那么这个线程还是不会中断的!Object类中的wait():线程进入等待状态,直到其他线程调用此对象的 notify() 方法或 notifyAll() 唤醒方法。这个状态跟加锁有关,所以是Object的方法。Object类中的notify():唤醒在此对象监视器上等待的单个线程。如果所有线程都在此对象上等待,则会选择唤醒其中一个线程。选择是任意性的,并在对实现做出决定时发生。线程通过调用其中一个 wait 方法,在对象的监视器上等待。 直到当前的线程放弃此对象上的锁定,才能继续执行被唤醒的线程。被唤醒的线程将以常规方式与在该对象上主动同步的其他所有线程进行竞争;类似的方法还有一个notifyAll(),唤醒在此对象监视器上等待的所有线程。同步与锁内存一致性错误public class Counter { private int c = 0; public void increment() { c++; } public void decrement() { c–; } public int value() { return c; }}volatilepublic class Foo { private int x = -1; private volatile boolean v = false; public void setX(int x) { this.x = x; v = true; } public int getX() { if (v == true) { return x; } return 0; }}volatile关键字实际上指定了变量不使用寄存器, 并且对变量的访问不会乱序执行。但仅仅对原始类型变量本身生效,如果是++或者–这种“非原子”操作,则不能保证多线程操作的正确性了原子类型JDK提供了一系列对基本类型的封装,形成原子类型(Atomic Variables),特别适合用来做计数器import java.util.concurrent.atomic.AtomicInteger;class AtomicCounter { private AtomicInteger c = new AtomicInteger(0); public void increment() { c.incrementAndGet(); } public void decrement() { c.decrementAndGet(); } public int value() { return c.get(); }}原子操作的实现原理,在Java8之前和之后不同Java7public final int getAndIncrement() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return current; }}Java8public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1);}至于Compare-and-Swap,以及Fetch-and-Add两种算法,是依赖机器底层机制实现的。线程安全的集合类BlockingQueue: 定义了一个先进先出的数据结构,当你尝试往满队列中添加元素,或者从空队列中获取元素时,将会阻塞或者超时ConcurrentMap: 是 java.util.Map 的子接口,定义了一些有用的原子操作。移除或者替换键值对的操作只有当 key 存在时才能进行,而新增操作只有当 key 不存在时。使这些操作原子化,可以避免同步。ConcurrentMap 的标准实现是 ConcurrentHashMap,它是 HashMap 的并发模式。ConcurrentNavigableMap: 是 ConcurrentMap 的子接口,支持近似匹配。ConcurrentNavigableMap 的标准实现是 ConcurrentSkipListMap,它是 TreeMap 的并发模式。ThreadLocal-只有本线程才能访问的变量ThreadLoal 变量,它的基本原理是,同一个 ThreadLocal 所包含的对象(对ThreadLocal< String >而言即为 String 类型变量),在不同的 Thread 中有不同的副本(实际是不同的实例,后文会详细阐述)。这里有几点需要注意因为每个 Thread 内有自己的实例副本,且该副本只能由当前 Thread 使用。这是也是 ThreadLocal 命名的由来既然每个 Thread 有自己的实例副本,且其它 Thread 不可访问,那就不存在多线程间共享的问题。它与普通变量的区别在于,每个使用该变量的线程都会初始化一个完全独立的实例副本。ThreadLocal 变量通常被private static修饰。当一个线程结束时,它所使用的所有 ThreadLocal 相对的实例副本都可被回收。总的来说,ThreadLocal 适用于每个线程需要自己独立的实例且该实例需要在多个方法中被使用,也即变量在线程间隔离而在方法或类间共享的场景。后文会通过实例详细阐述该观点。另外,该场景下,并非必须使用 ThreadLocal ,其它方式完全可以实现同样的效果,只是 ThreadLocal 使得实现更简洁。synchronized关键字方法加锁:其实不是加在指定的方法上,而是在指定的对象上,只不过在方法开始前会检查这个锁静态方法锁:加在类上,它和加在对象上的锁互补干扰代码区块锁:其实不是加在指定的代码块上,而是加在指定的对象上,只不过在代码块开始前会检查这个锁。一个对象只会有一个锁,所以代码块锁和实例方法锁是会互相影响的需要注意的是:无论synchronized关键字加在方法上还是对象上,它取得的锁都是对象,而不是把一段代码或函数当作锁――而且同步方法很可能还会被其他线程的对象访问,每个对象只有一个锁(lock)与之相关联加锁不慎可能会造成死锁线程池(Java 5)用途真正的多线程使用,是从线程池开始的,Callable接口,基本上也是被线程池调用的。线程池全景图线程池的使用 ExecutorService pool = Executors.newFixedThreadPool(3); Callable<Person> worker1 = new Callee(); Future ft1 = pool.submit(worker1); Callable<Person> worker2 = new Callee(); Future ft2 = pool.submit(worker2); Callable<Person> worker3 = new Callee(); Future ft3 = pool.submit(worker3); System.out.println(“准备通知线程池shutdown…”); pool.shutdown(); System.out.println(“已通知线程池shutdown”); try { pool.awaitTermination(2L, TimeUnit.SECONDS); System.out.println(“线程池完全结束”); } catch (InterruptedException e) { e.printStackTrace(); }线程池要解决的问题任务排队:当前能并发执行的线程数总是有限的,但任务数可以很大线程调度:线程的创建是比较消耗资源的,需要一个池来维持活跃线程结果收集:每个任务完成以后,其结果需要统一采集线程池类型newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。newFixedThreadPool:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。newScheduledThreadPool:创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。newSingleThreadScheduledExecutor:创建一个单线程的线程池。此线程池支持定时以及周期性执行任务的需求。线程池状态线程池在构造前(new操作)是初始状态,一旦构造完成线程池就进入了执行状态RUNNING。严格意义上讲线程池构造完成后并没有线程被立即启动,只有进行“预启动”或者接收到任务的时候才会启动线程。这个会后面线程池的原理会详细分析。但是线程池是出于运行状态,随时准备接受任务来执行。线程池运行中可以通过shutdown()和shutdownNow()来改变运行状态。shutdown()是一个平缓的关闭过程,线程池停止接受新的任务,同时等待已经提交的任务执行完毕,包括那些进入队列还没有开始的任务,这时候线程池处于SHUTDOWN状态;shutdownNow()是一个立即关闭过程,线程池停止接受新的任务,同时线程池取消所有执行的任务和已经进入队列但是还没有执行的任务,这时候线程池处于STOP状态。一旦shutdown()或者shutdownNow()执行完毕,线程池就进入TERMINATED状态,此时线程池就结束了。isTerminating()描述的是SHUTDOWN和STOP两种状态。isShutdown()描述的是非RUNNING状态,也就是SHUTDOWN/STOP/TERMINATED三种状态。任务拒绝策略Fork/Join模型(Java7)用途计算密集型的任务,最好很少有IO等待,也没有Sleep之类的,最好是本身就适合递归处理的算法分析在给定的线程数内,尽可能地最大化利用CPU资源,但又不会导致其他资源过载(比如内存),或者大量空线程等待。ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法。这里的要点在于,ForkJoinPool需要使用相对少的线程来处理大量的任务。比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概2000000+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。所以当使用ThreadPoolExecutor时,使用分治法会存在问题,因为ThreadPoolExecutor中的线程无法像任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。以上程序的关键是fork()和join()方法。在ForkJoinPool使用的线程中,会使用一个内部队列来对需要执行的任务以及子任务进行操作来保证它们的执行顺序。那么使用ThreadPoolExecutor或者ForkJoinPool,会有什么性能的差异呢?首先,使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。ps:ForkJoinPool在执行过程中,会创建大量的子任务,导致GC进行垃圾回收,这些是需要注意的。原理与使用ForkJoinPool首先是ExecutorService的实现类,因此是特殊的线程池。创建了ForkJoinPool实例之后,就可以调用ForkJoinPool的submit(ForkJoinTask<T> task) 或invoke(ForkJoinTask<T> task)方法来执行指定任务了。其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务。个人认为ForkJoinPool设计不太好的地方在于,ForkJoinTask不是个接口,而是抽象类,实际使用时基本上不是继承RecursiveAction就是继承RecursiveTask,对业务类有限制。示例典型的一个例子,就是一串数组求和public interface Calculator { long sumUp(long[] numbers);}public class ForkJoinCalculator implements Calculator { private ForkJoinPool pool; private static class SumTask extends RecursiveTask<Long> { private long[] numbers; private int from; private int to; public SumTask(long[] numbers, int from, int to) { this.numbers = numbers; this.from = from; this.to = to; } @Override protected Long compute() { // 当需要计算的数字小于6时,直接计算结果 if (to - from < 6) { long total = 0; for (int i = from; i <= to; i++) { total += numbers[i]; } return total; // 否则,把任务一分为二,递归计算 } else { int middle = (from + to) / 2; SumTask taskLeft = new SumTask(numbers, from, middle); SumTask taskRight = new SumTask(numbers, middle+1, to); taskLeft.fork(); taskRight.fork(); return taskLeft.join() + taskRight.join(); } } } public ForkJoinCalculator() { // 也可以使用公用的 ForkJoinPool: // pool = ForkJoinPool.commonPool() pool = new ForkJoinPool(); } @Override public long sumUp(long[] numbers) { return pool.invoke(new SumTask(numbers, 0, numbers.length-1)); }}这个例子展示了当数组被拆分得足够小(<6)之后,就不需要并行处理了,而更大的数组就拆为两半,分别处理。Stream(Java 8)概念别搞混了,跟IO的Stream完全不是一回事,可以把它看做是集合处理的声明式语法,类似数据库操作语言SQL。当然也有跟IO类似的地方,就是Stream只能消费一次,不能重复使用。看个例子:int sum = widgets.stream().filter(w -> w.getColor() == RED) .mapToInt(w -> w.getWeight()) .sum();流提供了一个能力,任何一个流,只要获取一次并行流,后面的操作就都可以并行了。例如:Stream<String> stream = Stream.of(“a”, “b”, “c”,“d”,“e”,“f”,“g”);String str = stream.parallel().reduce((a, b) -> a + “,” + b).get();System.out.println(str);流操作生成流Collection.stream()Collection.parallelStream()Arrays.stream(T array) or Stream.of()java.io.BufferedReader.lines()java.util.stream.IntStream.range()java.nio.file.Files.walk()java.util.SpliteratorRandom.ints()BitSet.stream()Pattern.splitAsStream(java.lang.CharSequence)JarFile.stream()示例// 1. Individual valuesStream stream = Stream.of(“a”, “b”, “c”);// 2. ArraysString [] strArray = new String[] {“a”, “b”, “c”};stream = Stream.of(strArray);stream = Arrays.stream(strArray);// 3. CollectionsList<String> list = Arrays.asList(strArray);stream = list.stream();需要注意的是,对于基本数值型,目前有三种对应的包装类型 Stream:IntStream、LongStream、DoubleStream。当然我们也可以用 Stream<Integer>、Stream<Long> >、Stream<Double>,但是 boxing 和 unboxing 会很耗时,所以特别为这三种基本数值型提供了对应的 Stream。Intermediate一个流可以后面跟随零个或多个 intermediate 操作。其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历。已知的Intermediate操作包括:map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered。Terminal一个流只能有一个 terminal操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以这必定是流的最后一个操作。Terminal 操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个 side effect。已知的Terminal操作包括:forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iteratorreduce解析: reduce本质上是个聚合方法,它的作用是用流里面的元素生成一个结果,所以用来做累加,字符串拼接之类的都非常合适。它有三个参数初始值:最终结果的初始化值,可以是一个空的对象聚合函数:一个二元函数(有两个参数),第一个参数是上一次聚合的结果,第二个参数是某个元素多个部分结果的合并函数:如果流并发了,那么聚合操作会分为多段进行,这里显示了多段之间如何配合collect: collect比reduce更强大:reduce最终只能得到一个跟流里数据类型相同的值, 但collect的结果可以是任何对象。简单的collect也有三个参数:最终要返回的数据容器把元素并入返回值的方法多个部分结果的合并两个collect示例//和reduce相同的合并字符操作String concat = stringStream.collect(StringBuilder::new, StringBuilder::append,StringBuilder::append).toString();//等价于上面,这样看起来应该更加清晰String concat = stringStream.collect(() -> new StringBuilder(),(l, x) -> l.append(x), (r1, r2) -> r1.append(r2)).toString();//把stream转成mapStream stream = Stream.of(1, 2, 3, 4).filter(p -> p > 2);List result = stream.collect(() -> new ArrayList<>(), (list, item) -> list.add(item), (one, two) -> one.addAll(two));/ 或者使用方法引用 */result = stream.collect(ArrayList::new, List::add, List::addAll);协程协程,英文Coroutines,也叫纤程(Fiber)是一种比线程更加轻量级的存在。正如一个进程可以拥有多个线程一样,一个线程也可以拥有多个协程。协程实际上是在语言底层(或者框架)对需要等待的程序进行调度,从而充分利用CPU的方法, 其实这完全可以通过回调来实现, 但是深层回调的代码太变态了,所以发明了协程的写法。理论上多个协程不会真的"同时"执行,也就不会引起共享变量操作的不确定性,不需要加锁(待确认)。pythone协程示例Pythone, Golang和C#都内置了协程的语法,但Java没有,只能通过框架实现,常见的框架包括:Quasar,kilim和ea-async。Java ea-async 协程示例import static com.ea.async.Async.await;import static java.util.concurrent.CompletableFuture.completedFuture;public class Store{ //购物操作, 传一个商品id和一个价格 public CompletableFuture<Boolean> buyItem(String itemTypeId, int cost) { //银行扣款(长时间操作) if(!await(bank.decrement(cost))) { return completedFuture(false); } try { //商品出库(长时间操作) await(inventory.giveItem(itemTypeId)); return completedFuture(true); } catch (Exception ex) { await(bank.refund(cost)); throw new AppException(ex); } }}参考资料《七周七并发模型》电子书深入浅出Java Concurrency——线程池Java多线程学习(吐血超详细总结)Jetty基础之线程模型Jetty-server高性能,多线程特性的源码分析Java 编程要点之并发(Concurrency)详解Java Concurrency in Depth (Part 1)Java进阶(七)正确理解Thread Local的原理与适用场景Java 并发编程笔记:如何使用 ForkJoinPool 以及原理ForkJoinPool简介多线程 ForkJoinPoolJava 8 中的 Streams API 详解Java中的协程实现漫画:什么是协程学习源码 ...

January 4, 2019 · 4 min · jiezi