乐趣区

关于阿里云:一种新的流为-Java-加入生成器Generator特性

作者:文镭(依来)

前言

这篇文章不是工具举荐,也不是利用案例分享。其主题思想,是介绍 一种全新的设计模式。它既领有形象的数学美感,仅仅从一个简略接口登程,就能推演出宏大的个性汇合,引出许多全新概念。同时也有扎实的工程实用价值,由其实现的工具,性能均可显著超过同类的头部开源产品。

这一设计模式并非因 Java 而生,而是诞生于一个非常简陋的脚本语言。它对语言个性的要求十分之低,因此其价值对泛滥古代编程语言都是普适的。

对于 Stream

首先大略回顾下 Java 里传统的流式 API。自 Java8 引入 lambda 表达式和 Stream 以来,Java 的开发便捷性有了质的飞跃,Stream 在简单业务逻辑的解决上让人效率倍增,是每一位 Java 开发者都应该把握的根底技能。但排除掉 parallelStream 也即并发流之外,它其实并不是一个好的设计。

第一、封装过重,实现过于简单,源码极其难读。我能了解这或者是为了兼容并发流所做的斗争,但毕竟耦合太深,显得艰深晦涩。每一位初学者被源码吓到之后,想必都会产生流是一种非常高级且实现简单的个性的印象。实际上并不是这样,流其实能够用非常简单的形式构建

第二、API 过于简短。简短体现在 stream.collect 这一部分。作为比照,Kotlin 提供的 toList/toSet/associate(toMap)等等丰盛操作是能够间接作用在流上的。Java 直到 16 才抠抠索索加进来一个 Stream 能够间接调用的 toList,他们甚至不肯把 toSet/toMap 一起加上。

第三、API 性能简陋。对于链式操作,在最后的 Java8 里只有 map/filter/skip/limit/peek/distinct/sorted 这七个,Java9 又加上了 takeWhile/dropWhile。然而在 Kotlin 中,除了这几个之外人还有许多额定的实用功能。

例如:

mapIndexed,mapNotNull,filterIndexed,filterNotNull,onEachIndexed,distinctBy, sortedBy,sortedWith,zip,zipWithNext 等等,翻倍了不止。这些货色实现起来并不简单,就是个棘手的事,但对于用户而言有和没有的体验差别堪称微小。

在这篇文章里,我将提出一种全新的机制用于构建流。这个机制极其简略,任何能看懂 lambda 表达式 (闭包) 的同学都能亲手实现,任何反对闭包的编程语言都能利用该机制实现本人的流。也正是因为这个机制足够简略,所以开发者能够以相当低的老本撸出大量的实用 API,应用体验甩开 Stream 两条街,不是问题。

对于生成器

生成器 (Generator)[1] 是许多古代编程语言里一个广受好评的重要个性,在 Python/Kotlin/C#/Javascript 等等语言中均有间接反对。它的外围 API 就是一个 yield 关键字(或者办法)。

有了生成器之后,无论是 iterable/iterator,还是一段乌七八糟的闭包,都能够间接映射为一个流。举个例子,假如你想实现一个下划线字符串转驼峰的办法,在 Python 里你能够利用生成器这么玩

def underscore_to_camelcase(s):
    def camelcase():
        yield str.lower
        while True:
            yield str.capitalize

    return ''.join(f(sub) for sub, f in zip(s.split('_'), camelcase()))

这短短几行代码能够说处处体现出了 Python 生成器的奇妙。首先,camelcase 办法里呈现了 yield 关键字,解释器就会将其看作是一个生成器,这个生成器会首先提供一个 lower 函数,而后提供有数的 capitalize 函数。因为生成器的执行始终是 lazy 的,所以用 while true 的形式生成有限流是非常常见的伎俩,不会有性能或者内存上的节约。其次,Python 里的流是能够和 list 一起进行 zip 的,无限的 list 和有限的流 zip 到一起,list 完结了流天然也会完结。

这段代码中,开端那行 join()括号里的货色,Python 称之为生成器推导(Generator Comprehension)[2],其本质上仍然是一个流,一个 zip 流被 map 之后的 string 流,最终通过 join 办法聚合为一个 string。

以上代码里的操作,在任何反对生成器的语言里都能够轻易实现,然而在 Java 里你恐怕连想都不敢想。Java 有史以来,无论是历久弥新的 Java8,还是最新的引入了 Project Loom[3]的 OpenJDK19,连协程都有了,仍然没有间接反对生成器。

实质上,生成器的实现要依赖于 continuation[4]的挂起和复原,所谓 continuation 能够直观了解为程序执行到指定地位后的断点,协程就是指在这个函数的断点挂起后跳到另一个函数的某个断点继续执行,而不会阻塞线程,生成器亦如是。

Python 通过栈帧的保留与复原实现函数重入以及生成器 [5],Kotlin 在编译阶段利用 CPS(Continuation Passing Style)[6] 技术对字节码进行了变换,从而在 JVM 上模仿了协程[7]。其余的语言要么大体如此,要么有更间接的反对。

那么,有没有一种方法,能够在没有协程的 Java 里,实现或者至多模拟出一个 yield 关键字,从而动静且高性能地创立流呢。答案是,有。

注释

Java 里的流叫 Stream,Kotlin 里的流叫 Sequence。我切实想不出更好的名字了,想叫 Flow 又被用了,简略起见权且叫 Seq。

概念定义

首先给出 Seq 的接口定义

public interface Seq<T> {void consume(Consumer<T> consumer);
}

它实质上就是一个 consumer of consumer,其实在含意我后边会讲。这个接口看似形象,实则十分常见,java.lang.Iterable 人造自带了这个接口,那就是大家耳熟能详的 forEach。利用办法推导,咱们能够写出第一个 Seq 的实例

List<Integer> list = Arrays.asList(1, 2, 3);
Seq<Integer> seq = list::forEach;

能够看到,在这个例子里consume 和 forEach 是齐全等价的,事实上这个接口我最早就是用 forEach 命名的,几轮迭代之后才改成含意更精确的 consume。

利用单办法接口在 Java 里会自动识别为 FunctionalInteraface 这一平凡个性,咱们也能够用一个简略的 lambda 表达式来结构流,比方只有一个元素的流。

static <T> Seq<T> unit(T t) {return c -> c.accept(t);
}

这个办法在数学上很重要(实操上其实用的不多),它定义了 Seq 这个泛型类型的单位元操作,即 T -> Seq<T> 的映射。

map 与 flatMap

map

从 forEach 的直观角度登程,咱们很容易写出 map[8],将类型为 T 的流,转换为类型为 E 的流,也即依据函数 T -> E 失去 Seq<T> -> Seq<E> 的映射。

default <E> Seq<E> map(Function<T, E> function) {return c -> consume(t -> c.accept(function.apply(t)));
}

flatMap

同理,能够持续写出 flatMap,行将每个元素开展为一个流之后再合并。

default <E> Seq<E> flatMap(Function<T, Seq<E>> function) {return c -> consume(t -> function.apply(t).consume(c));
}

大家能够本人在 IDEA 里写写这两个办法,联合智能提醒,写起来其实十分不便。如果你感觉了解起来不太直观,就把 Seq 看作是 List,把 consume 看作是 forEach 就好。

filter 与 take/drop

map 与 flatMap 提供了流的映射与组合能力,流还有几个外围能力:元素过滤与中断管制。

filter

过滤元素,实现起来也很简略

default Seq<T> filter(Predicate<T> predicate) {
    return c -> consume(t -> {if (predicate.test(t)) {c.accept(t);
        }
    });
}

take

流的中断管制有很多场景,take 是最常见的场景之一,即获取前 n 个元素,前面的不要——等价于 Stream.limit。

因为 Seq 并不依赖 iterator,所以必须通过异样实现中断。为此须要构建一个全局单例的专用异样,同时勾销这个异样对调用栈的捕捉,以缩小性能开销(因为是全局单例,不勾销也没关系)

public final class StopException extends RuntimeException {public static final StopException INSTANCE = new StopException();

    @Override
    public synchronized Throwable fillInStackTrace() {return this;}
}

以及相应的办法

static <T> T stop() {throw StopException.INSTANCE;}

default void consumeTillStop(C consumer) {
    try {consume(consumer);
    } catch (StopException ignore) {}}

而后就能够实现 take 了:

default Seq<T> take(int n) {
    return c -> {int[] i = {n};
        consumeTillStop(t -> {if (i[0]-- > 0) {c.accept(t);
            } else {stop();
            }
        });
    };
}

drop

drop 是与 take 对应的概念,抛弃前 n 个元素——等价于 Stream.skip。它并不波及流的中断管制,反而更像是 filter 的变种,一种带有状态的 filter。察看它和下面 take 的实现细节,外部随着流的迭代,存在一个计数器在一直刷新状态,但这个计数器并不能为外界感知。这里其实曾经能体现出流的洁净个性,它哪怕携带了状态,也丝毫不会外露。

default Seq<T> drop(int n) {
    return c -> {int[] a = {n - 1};
        consume(t -> {if (a[0] < 0) {c.accept(t);
            } else {a[0]--;
            }
        });
    };
}

其余 API

onEach

对流的某个元素增加一个操作 consumer,然而不执行流——对应 Stream.peek。

default Seq<T> onEach(Consumer<T> consumer) {return c -> consume(consumer.andThen(c));
}

zip

流与一个 iterable 元素两两聚合,而后转换为一个新的流——在 Stream 里没有对应,但在 Python 里有同名实现。

default <E, R> Seq<R> zip(Iterable<E> iterable, BiFunction<T, E, R> function) {
    return c -> {Iterator<E> iterator = iterable.iterator();
        consumeTillStop(t -> {if (iterator.hasNext()) {c.accept(function.apply(t, iterator.next()));
            } else {stop();
            }
        });
    };
}

终端操作

下面实现的几个办法都是流的链式 API,它们将一个流映射为另一个流,但流自身仍然是 lazy 或者说尚未真正执行的。真正执行这个流须要应用所谓终端操作,对流进行生产或者聚合。在 Stream 里,生产就是 forEach,聚合就是 Collector。对于 Collector,其实也能够有更好的设计,这里就不开展了。不过为了示例,能够先简略疾速实现一个 join。

default String join(String sep) {StringJoiner joiner = new StringJoiner(sep);
    consume(t -> joiner.add(t.toString()));
    return joiner.toString();}

以及 toList。

default List<T> toList() {List<T> list = new ArrayList<>();
    consume(list::add);
    return list;
}

至此为止,咱们仅仅只用几十行代码,就实现出了一个五脏俱全的流式 API。在大部分状况下,这些 API 曾经能笼罩百分之八九十的应用场景。你齐全能够如法炮制,在其余编程语言里照着玩一玩,比方 Go(笑)。

生成器的推导

本文尽管从题目开始就在讲生成器,甚至毫不夸大的说生成器才是最外围的个性,但等到把几个外围的流式 API 写完了,仍然没有解释生成器到底是咋回事——其实倒也不是我在卖关子,你只有仔细观察一下,生成器早在最开始讲到 Iterable 天生就是 Seq 的时候,就曾经呈现了。

List<Integer> list = Arrays.asList(1, 2, 3);
Seq<Integer> seq = list::forEach;

没看进去?那把这个办法推导改写为一般 lambda 函数,有

Seq<Integer> seq = c -> list.forEach(c);

再进一步,把这个 forEach 替换为更传统的 for 循环,有

Seq<Integer> seq = c -> {for (Integer i : list) {c.accept(i);
    }
};

因为已知这个 list 就是[1, 2, 3],所以以上代码能够进一步等价写为

Seq<Integer> seq = c -> {c.accept(1);
    c.accept(2);
    c.accept(3);
};

是不是有点眼生?无妨看看 Python 里相似的东西长啥样:

def seq():
    yield 1
    yield 2
    yield 3

二者绝对比,模式简直能够说截然不同——这其实就曾经是生成器了,这段代码里的 accept 就表演了 yield 的角色,consume 这个接口之所以取这个名字,含意就是指它是一个生产操作,所有的终端操作都是基于这个生产操作实现的。性能上看,它齐全等价于 Iterable 的 forEach,之所以又不间接叫 forEach,是因为 它的元素并不是自身自带的,而是通过闭包内的代码块长期生成的

这种生成器,并非传统意义上利用 continuation 挂起的生成器,而是利用闭包来捕捉代码块里长期生成的元素,哪怕没有挂起,也能高度模仿传统生成器的用法和个性。其实上文所有链式 API 的实现,实质上也都是生成器,只不过生成的元素来自于原始的流罢了。

有了生成器,咱们就能够把前文提到的下划线转驼峰的操作用 Java 也如法炮制写进去了。

static String underscoreToCamel(String str) {
    // Java 没有首字母大写办法,轻易现写一个
    UnaryOperator<String> capitalize = s -> s.substring(0, 1).toUpperCase() + s.substring(1).toLowerCase();
     // 利用生成器结构一个办法的流
    Seq<UnaryOperator<String>> seq = c -> {
        // yield 第一个小写函数
        c.accept(String::toLowerCase);
        // 这里 IDEA 会告警,提醒死循环危险,忽视即可
        while (true) {
            // 按需 yield 首字母大写函数
            c.accept(capitalize);
        }
    };
    List<String> split = Arrays.asList(str.split("_"));
    // 这里的 zip 和 join 都在上文给出了实现
    return seq.zip(split, (f, sub) -> f.apply(sub)).join("");
}

大家能够把这几段代码拷下来跑一跑,看它是不是真的实现了其指标性能。

生成器的实质

尽管曾经推导出了生成器,但仿佛还是有点摸不着头脑,这两头到底产生了什么,死循环是咋跳出的,怎么就能生成元素了。为了进一步解释,这里再举一个大家相熟的例子。

生产者 - 消费者模式

生产者与消费者的关系不止呈现在多线程或者协程语境下,在单线程里也有一些经典场景。比方 A 和 B 两名同学单干一个我的项目,别离开发两个模块:A 负责产出数据,B 负责应用数据。A 不关怀 B 怎么解决数据,可能要先过滤一些,进行聚合后再做计算,也可能是写到某个本地或者近程的存储;B 天然也不关怀 A 的数据是怎么来的。这里边惟一的问题在于,数据条数切实是太多了,内存一次性放不下。在这种状况下,传统的做法是让 A 提供一个带回调函数 consumer 的接口,B 在调用 A 的时候传入一个具体的 consumer。

public void produce(Consumer<String> callback) {
    // do something that produce strings
    // then use the callback consumer to eat them
}

这种基于回调函数的交互方式切实是过于经典了,本来没啥可多说的。然而在曾经有了生成器之后,咱们无妨胆子放大一点略微做一下革新:仔细观察下面这个 produce 接口,它输出一个 consumer,返回 void——咦,所以它其实也是一个 Seq 嘛!

Seq<String> producer = this::produce;

接下来,咱们只须要略微调整下代码,就能对这个本来基于回调函数的接口进行一次降级,将它变成一个生成器。

public Seq<String> produce() {
    return c -> {
        // still do something that produce strings
        // then use the callback consumer to eat them
    };
}

基于这一层形象,作为生产者的 A 和作为消费者的 B 就真正做到齐全的、彻底的解耦了。A 只须要把数据生产过程放到生成器的闭包里,期间波及到的所有副作用,例如 IO 操作等,都被这个闭包齐全隔离了。B 则间接拿到一个干干净净的流,他不须要关怀流的外部细节,当然想关怀也关怀不了,他只用专一于本人想做的事件即可。

更重要的是,A 和 B 尽管在操作逻辑上齐全解耦,相互不可见,但在 CPU 调度工夫上它们却是彼此交织的,B 甚至还能间接阻塞、中断 A 的生产流程——能够说没有协程,胜似协程。

至此,咱们终于胜利发现了 Seq 作为生成器的真正实质 :consumer of callback。明明是一个回调函数的消费者,摇身一变就成了生产者,切实是有点微妙。不过认真一想倒也正当:可能满足消费者需要(callback) 的家伙,不论这需要有如许奇怪,可不就是生产者么。

容易发现,基于 callback 机制的生成器,其调用开销齐全就只有生成器闭包外部那堆代码块的执行开销,加上一点点微不足道的闭包创立开销。在诸多波及到流式计算与管制的业务场景里,这将带来极为显著的内存与性能劣势。前面我会给出展示其性能劣势的具体场景实例。

另外,察看这段革新代码,会发现 produce 输入的货色,基本就还是个函数,没有任何数据被真正执行和产出。这就是生成器作为一个匿名接口的天生劣势:惰性计算——消费者看似失去了整个流,理论那只是一张爱的号码牌,能够涂写,能够废除,但只有在拿着货真价实的 callback 去兑换的那一刻,才会真正的执行流。

生成器的实质,正是人类实质的背面:鸽子克星——没有任何人能够鸽它

IO 隔离与流输入

Haskell 创造了所谓 IO Monad[9]来将 IO 操作与纯函数的世界隔离。Java 利用 Stream,勉强做到了相似的封装成果。以 java.io.BufferedReader 为例,将本地文件读取为一个 Stream<String>,能够这么写:

Stream<String> lines = new BufferedReader(new InputStreamReader(new FileInputStream("file"))).lines();

如果你认真查看一下这个 lines 办法的实现,会发现它应用了大段代码去创立了一个 iterator,而后才将其转变为 stream。暂且不提它的实现有如许繁琐,这里首先应该留神的是 BufferedReader 是一个 Closeable,平安的做法是在应用结束后 close,或者利用 try-with-resources 语法包一层,实现主动 close。然而 BufferedReader.lines 并没有去敞开这个源,它是一个不那么平安的接口——或者说,它的隔离是不残缺的。Java 对此也打了个补丁,应用 java.nio.file.Files.lines,它会增加加一个 onClose 的回调 handler,确保 stream 耗尽后执行敞开操作。

那么有没有更普适做法呢,毕竟不是所有人都分明 BufferedReader.lines 和 Files.lines 会有这种安全性上的区别,也不是所有的 Closeable 都能提供相似的平安敞开的流式接口,甚至大概率压根就没有流式接口。

好在当初咱们有了 Seq,它的闭包个性自带隔离副作用的先天劣势。凑巧在波及大量数据 IO 的场景里,利用 callback 交互又是极为经典的设计形式——这里几乎就是它大展拳脚的最佳舞台。

用生成器实现 IO 的隔离非常简单,只须要整个包住 try-with-resources 代码即可,它同时就包住了 IO 的整个生命周期。

Seq<String> seq = c -> {try (BufferedReader reader = Files.newBufferedReader(Paths.get("file"))) {
        String s;
        while ((s = reader.readLine()) != null) {c.accept(s);
        }
    } catch (Exception e) {throw new RuntimeException(e);
    }
};

外围代码其实就 3 行,构建数据源,挨个读数据,而后 yield(即 accept)。后续对流的任何操作看似产生在创立流之后,理论执行起来都被包进了这个 IO 生命周期的外部,读一个生产一个,彼此交替,随用随走。

换句话讲,生成器的 callback 机制,保障了哪怕 Seq 能够作为变量到处传递,但波及到的任何副作用操作,都是包在同一个代码块里惰性执行的。它不须要像 Monad 那样,还得定义诸如 IOMonad,StateMonad 等等花色泛滥的 Monad。

与之相似,这里无妨再举个阿里中间件的例子,利用 Tunnel 将大家相熟的 ODPS 表数据下载为一个流:

public static Seq<Record> downloadRecords(TableTunnel.DownloadSession session) {
    return c -> {long count = session.getRecordCount();
        try (TunnelRecordReader reader = session.openRecordReader(0, count)) {for (long i = 0; i < count; i++) {c.accept(reader.read());
            }
        } catch (Exception e) {throw new RuntimeException(e);
        }
    };
}

有了 Record 流之后,如果再能实现出一个 map 函数,就能够十分不便的将 Record 流 map 为带业务语义的 DTO 流——这其实就等价于一个 ODPS Reader。

异步流

基于 callback 机制的生成器,除了能够在 IO 畛域大展拳脚,它人造也是亲和异步操作的。毕竟一听到回调函数这个词,很多人就能条件反射式的想到异步,想到 Future。一个 callback 函数,它的命运就决定了它是不会在乎本人被放到哪里、被怎么应用的。比方说,丢给某个暴力的异步逻辑:

public static Seq<Integer> asyncSeq() {
    return c -> {CompletableFuture.runAsync(() -> c.accept(1));
        CompletableFuture.runAsync(() -> c.accept(2));
    };
}

这就是一个简略而粗犷的异步流生成器。对于内部使用者来说,异步流除了不能保障元素程序,它和同步流没有任何区别,实质上都是一段可运行的代码,边运行边产生数据。 一个 callback 函数,谁给用不是用呢。

并发流

既然给谁用不是用,那么给 ForkJoinPool 用如何?——Java 赫赫有名的 parallelStream 就是基于 ForkJoinPool 实现的。咱们也能够拿来搞一个本人的并发流。具体做法很简略,把下面异步流示例里的 CompletableFuture.runAsync 换成 ForkJoinPool.submit 即可,只是要额定留神一件事:parallelStream 最终执行后是要阻塞的(比方最罕用的 forEach),它并非单纯将工作提交给 ForkJoinPool,而是在那之后还要做一遍 join。

对此咱们无妨采纳最为暴力而简略的思路,结构一个 ForkJoinTask 的 list,顺次将元素提交 forkJoinPool 后,产生一个 task 并增加进这个 list,等所有元素全副提交结束后,再对这个 list 里的所有 task 对立 join。

default Seq<T> parallel() {ForkJoinPool pool = ForkJoinPool.commonPool();
    return c -> map(t -> pool.submit(() -> c.accept(t))).cache().consume(ForkJoinTask::join);
}

这就是 基于生成器的并发流 ,它的实现仅仅只须要两行代码——正如本文开篇所说, 流能够用非常简单的形式构建。哪怕是 Stream 费了老大劲的并发流,换一种形式,实现起来能够简略到令人发指。

这里值得再次强调的是,这种机制并非 Java 限定,而是 任何反对闭包的编程语言都能玩 。事实上,这种流机制的最早验证和实现,就是我在 AutoHotKey_v2[10] 这个软件自带的简陋的脚本语言上实现的。

再谈生产者 - 消费者模式

后面为了解释生成器的 callback 实质,引入了单线程下的生产者 - 消费者模式。那在实现了异步流之后,事件就更有意思了。

回忆一下,Seq 作为一种两头数据结构,可能齐全解耦生产者与消费者,一方只管生产数据交给它,另一方只管从它那里拿数据生产。这种结构有没有感觉有点眼生?不错,正是Java 开发者常见的阻塞队列,以及反对协程的语言里的通道(Channel),比方 Go 和 Kotlin。

通道某种意义上也是一种阻塞队列,它和传统阻塞队列的次要区别,在于当通道里的数据超出限度或为空时,对应的生产者 / 消费者会挂起而不是阻塞,两种形式都会暂停生产 / 生产,只是协程挂起后能让出 CPU,让它去别的协程里持续干活。

那 Seq 相比 Channel 有什么劣势呢?劣势可太多了:首先,生成器闭包里 callback 的代码块,严格确保了生产和生产必然交替执行,也即严格的先进先出、进了就出、不进不出,所以不须要独自开拓堆内存去保护一个队列,那没有队列天然也就没有锁,没有锁天然也就没有阻塞或挂起。其次,Seq 实质上是生产监听生产,没有生产天然没有生产,如果生产过剩了——啊,生产永远不会过剩,因为 Seq 是惰性的,哪怕生产者在那儿 while 死循环有限生产,也不过是个司空见惯的有限流罢了。

这就是生成器的另一种了解形式,一个无队列、无锁、无阻塞的通道。Go 语言 channel 常被诟病的死锁和内存泄露问题,在 Seq 身上压根就不存在;Kotlin 搞进去的异步流 Flow 和同步流 Sequence 这两套大同小异的 API,都能被 Seq 对立替换。

能够说,没有比 Seq 更平安的通道实现了,因为基本就没有平安问题。生产了没有生产?Seq 原本就是惰性的,没有生产,那就啥也不会生产。生产完了没有敞开通道?Seq 原本就不须要敞开——一个 lambda 而已有啥好敞开的。

为了更直观的了解,这里给一个简略的通道示例。先轻易实现一个基于 ForkJoinPool 的异步生产接口,该接口容许用户自由选择生产完后是否 join。

default void asyncConsume(Consumer<T> consumer) {ForkJoinPool pool = ForkJoinPool.commonPool();
    map(t -> pool.submit(() -> consumer.accept(t))).cache().consume(ForkJoinTask::join);
}

有了异步生产接口,立马就能够演示出 Seq 的通道性能。

@Test
public void testChan() {
    // 生产有限的自然数,放入通道 seq,这里流自身就是通道,同步流还是异步流都无所谓
    Seq<Long> seq = c -> {
        long i = 0;
        while (true) {c.accept(i++);
        }
    };
    long start = System.currentTimeMillis();
    // 通道 seq 交给消费者,消费者示意只有偶数,只有 5 个
    seq.filter(i -> (i & 1) == 0).take(5).asyncConsume(i -> {
        try {Thread.sleep(1000);
            System.out.printf("produce %d and consume\n", i);
        } catch (InterruptedException e) {throw new RuntimeException(e);
        }
    });
    System.out.printf("elapsed time: %dms\n", System.currentTimeMillis() - start);
}

运行后果

produce 0 and consume
produce 8 and consume
produce 6 and consume
produce 4 and consume
produce 2 and consume
elapsed time: 1032ms

能够看到,因为生产是并发执行的,所以哪怕每个元素的生产都要花 1 秒钟,最终总体耗时也就比 1 秒多一点点。当然,这和传统的通道模式还是不太一样,比方理论工作线程就有很大区别。更全面的设计是在流的根底上加上无锁非阻塞队列实现正经 Channel,能够附带解决 Go 通道的许多问题同时晋升性能,前面我会另写文章专门探讨。

生成器的利用场景

上文介绍了生成器的实质个性,它是一个 consumer of callback,它能够以闭包的模式完满封装 IO 操作,它能够无缝切换为异步流和并发流,并在异步交互中表演一个无锁的通道角色。除去这些外围个性带来的劣势外,它还有十分多乏味且有价值的利用场景。

树遍历

一个 callback 函数,它的命运就决定了它是不会在乎本人被放到哪里、被怎么应用的,比如说,放进递归里。而递归的一个典型场景就是树遍历。作为比照,无妨先看看在 Python 里怎么利用 yield 遍历一棵二叉树的:

def scan_tree(node):
    yield node.value
    if node.left:
        yield from scan_tree(node.left)
    if node.right:
        yield from scan_tree(node.right)

对于 Seq,因为 Java 不容许函数外部套函数,所以要略微多写一点。外围原理其实很简略,把 callback 函数丢给递归函数,每次递归记得捎带上就行。

//static <T> Seq<T> of(T... ts) {//    return Arrays.asList(ts)::forEach;
//}

// 递归函数
public static <N> void scanTree(Consumer<N> c, N node, Function<N, Seq<N>> sub) {c.accept(node);
    sub.apply(node).consume(n -> {if (n != null) {scanTree(c, n, sub);
        }
    });
}

// 通用办法,能够遍历任何树
public static <N> Seq<N> ofTree(N node, Function<N, Seq<N>> sub) {return c -> scanTree(c, node, sub);
}

// 遍历一个二叉树
public static Seq<Node> scanTree(Node node) {return ofTree(node, n -> Seq.of(n.left, n.right));
}

这里的 ofTree 就是一个十分弱小的树遍历办法。遍历树自身并不是啥稀奇货色,但把遍历的过程输入为一个流,那设想空间就很大了。在编程语言的世界里树的结构能够说到处都是。比方说,咱们能够非常简略的结构出一个遍历 JSONObject 的流。

static Seq<Object> ofJson(Object node) {
    return Seq.ofTree(node, n -> c -> {if (n instanceof Iterable) {((Iterable<?>)n).forEach(c);
        } else if (n instanceof Map) {((Map<?, ?>)n).values().forEach(c);
        }
    });
}

而后剖析 JSON 就会变得非常不便,比方你想校验某个 JSON 是否存在 Integer 字段,不论这个字段在哪一层。应用流的 any/anyMatch 这样的办法,一行代码就能搞定:

boolean hasInteger = ofJson(node).any(t -> t instanceof Integer);

这个办法的厉害之处不仅在于它足够简略,更在于它是一个短路操作。用失常代码在一个深度优先的递归函数里执行短路,要不就抛出异样,要不就额定增加一个上下文参数参加递归(只有在返回根节点后能力进行),总之实现起来都挺麻烦。然而应用 Seq,你只须要一个 any/all/none。

再比方你想校验某个 JSON 字段里是否存在非法字符串“114514”,同样也是一行代码:

boolean isIllegal = ofJson(node).any(n -> (n instanceof String) && ((String)n).contains("114514"));

对了,JSON 的前辈 XML 也是树的构造,联合泛滥成熟的 XML 的解析器,咱们也能够实现出相似的流式扫描工具。比如说,更快的 Excel 解析器?

更好用的笛卡尔积

笛卡尔积对大部分开发而言可能用途不大,但它在函数式语言中是一种颇为重要的结构,在运筹学畛域构建最优化模型时也极其常见。此前 Java 里若要利用 Stream 构建多重笛卡尔积,须要多层 flatMap 嵌套。

public static Stream<Integer> cartesian(List<Integer> list1, List<Integer> list2, List<Integer> list3) {return list1.stream().flatMap(i1 ->
        list2.stream().flatMap(i2 ->
            list3.stream().map(i3 -> 
                i1 + i2 + i3)));
}

对于这样的场景,Scala 提供了一种语法糖,容许用户以 for 循环 +yield[11]的形式来组合笛卡尔积。不过 Scala 的 yield 就是个纯语法糖,与生成器并无间接关系,它会 在编译阶段 将代码翻译为下面 flatMap 的模式。这种糖模式上等价于 Haskell 里的 do annotation[12]。

好在当初有了生成器,咱们有了更好的抉择,能够在 不减少语法、不引入关键字、不麻烦编译器的前提下,间接写个嵌套 for 循环并输入为流。且模式更为自在——你能够在 for 循环的任意一层随便增加代码逻辑。

public static Seq<Integer> cartesian(List<Integer> list1, List<Integer> list2, List<Integer> list3) {
    return c -> {for (Integer i1 : list1) {for (Integer i2 : list2) {for (Integer i3 : list3) {c.accept(i1 + i2 + i3);
                }
            }
        }
    };
}

换言之,Java 不须要这样的糖。Scala 或者本来也能够不要。

可能是 Java 下最快的 CSV/Excel 解析器

我在前文屡次强调生成器将带来显著的性能劣势,这一观点除了有实践上的撑持,也有明确的工程实际数据,那就是我为 CSV 家族所开发的架构对立的解析器。所谓 CSV 家族除了 CSV 以外,还包含 Excel 与阿里云的 ODPS,其实只有模式合乎其对立范式,就都能进入这个家族。

然而对于 CSV 这一家子的解决其实始终是 Java 语言里的一个痛点。ODPS 就不说了,如同压根就没有。CSV 的库尽管很多,但如同都不是很让人称心,要么 API 繁琐,要么性能低下,没有一个的位置能与 Python 里的 Pandas 等量齐观。其中绝对出名一点的有 OpenCSV[13],Jackson 的 jackson-dataformat-csv[14],以及号称最快的 univocity-parsers[15]。

Excel 则不一样,有团体开源软件 EasyExcel[16]珠玉在前,我只能确保比它快,很难也不打算比它性能笼罩全。

对于其中的 CsvReader 实现,因为市面上相似产品切实太多,我也没精力挨个去比,我只能说反正它比公开号称最快的那个还要快不少——大略一年前我实现的 CsvReader 在我办公电脑上的速度最多只能达到 univocity-parsers 的 80%~90%,不管怎么优化也死活拉不下来。直到起初我发现了生成器机制并对其重构之后,速度间接 反超前者 30% 到 50%,成为我已知的相似开源产品里的最快实现。

对于 Excel,在给定的数据集上,我实现的 ExcelReader 比 EasyExcel快 50%~55%,跟 POI 就懒得比了。测试详情见以上链接。

注:最近和 Fastjson 作者高铁有很多交换,在暂未正式公布的 Fastjson2 的 2.0.28-SNAPSHOT 版本 上,其 CSV 实现的性能在多个 JDK 版本上曾经根本追平我的实现。出于谨严,我只能说我的实现在本文公布之前可能是已知最快的哈哈。

革新 EasyExcel,让它能够间接输入流

下面提到的 EasyExcel 是阿里开源的出名产品,功能丰富,品质优良,广受好评。恰好它自身又一个利用回调函数进行 IO 交互的经典案例,倒是也非常适合拿来作为例子讲讲。依据官网示例,咱们能够结构一个最简略的基于回调函数的 excel 读取办法

public static <T> void readEasyExcel(String file, Class<T> cls, Consumer<T> consumer) {
    EasyExcel.read(file, cls, new PageReadListener<T>(list -> {for (T person : list) {consumer.accept(person);
        }
    })).sheet().doRead();
}

EasyExcel 的应用是通过回调监听器来捕捉数据的。例如这里的 PageReadListener,外部有一个 list 缓存。缓存满了,就喂给回调函数,而后持续刷缓存。这种基于回调函数的做法确实非常经典,然而不免有一些不不便的中央:

  1. 消费者须要关怀生产者的外部缓存,比方这里的缓存就是一个 list。
  2. 消费者如果想拿走全副数据,须要放一个 list 进去挨个 add 或者每次 addAll。这个操作是非惰性的。
  3. 难以把读取过程转变为 Stream,任何流式操作都必须要用 list 存完并转为流后,能力再做解决。灵活性很差。
  4. 消费者不不便干涉数据生产过程,比方达到某种条件 (例如个数) 后间接中断,除非你在实现回调监听器时把这个逻辑 override 进去[17]。

利用生成器,咱们能够将下面示例中读取 excel 的过程齐全关闭起来,消费者不须要传入任何回调函数,也不须要关怀任何外部细节——间接拿到一个流就好。革新起来也相当简略,主体逻辑一成不变,只须要把那个 callback 函数用一个 consumer 再包一层即可:

public static <T> Seq<T> readExcel(String pathName, Class<T> head) {
    return c -> {ReadListener<T> listener = new ReadListener<T>() {
            @Override
            public void invoke(T data, AnalysisContext context) {c.accept(data);
            }

            @Override
            public void doAfterAllAnalysed(AnalysisContext context) {}};
        EasyExcel.read(pathName, head, listener).sheet().doRead();
    };
}

这一革新我曾经给 EasyExcel 官网提了 PR[18],不过不是输入 Seq,而是基于生成器原理构建的 Stream,后文会有构建形式的具体介绍。

更进一步的,齐全能够将对 Excel 的解析过程革新为生成器形式,利用一次性的 callback 调用防止外部大量状态的存储与批改,从而带来可观的性能晋升。这一工作因为要依赖上文 CsvReader 的一系列 API,所以临时没法提交给 EasyExcel。

用生成器构建 Stream

生成器作为一种全新的设计模式,诚然能够提供更为弱小的流式 API 个性,然而毕竟不同于大家最为相熟 Stream,总会有个适应老本或者迁徙老本。对于既有的曾经成熟的库而言,应用 Stream 仍然是对用户最为负责的抉择。值得庆幸的是,哪怕机制齐全不同,Stream 和 Seq 仍是高度兼容的。

首先,不言而喻,就如同 Iterable 那样,Stream 人造就是一个 Seq:

Stream<Integer> stream = Stream.of(1, 2, 3);
Seq<Integer> seq = stream::forEach;

那反过来 Seq 是否转化为 Stream 呢?在 Java Stream 提供的官网实现里,有一个 StreamSupport.stream 的结构工具,能够帮忙用户将一个 iterator 转化为 stream。针对这个入口,咱们其实能够用生成器来结构一个非标准的 iterator:不实现 hastNext 和 next,而是独自重载 forEachRemaining 办法,从而 hack 进 Stream 的底层逻辑——在那迷宫个别的源码里,有一个十分隐秘的角落,一个叫 AbstractPipeline.copyInto 的办法,会在真正执行流的时候调用 Spliterator 的 forEachRemaining 办法来遍历元素——尽管这个办法本来是通过 next 和 hasNext 实现的,但当咱们把它重载之后,就能够做到假狸猫换真太子。

public static <T> Stream<T> stream(Seq<T> seq) {Iterator<T> iterator = new Iterator<T>() {
        @Override
        public boolean hasNext() {throw new NoSuchElementException();
        }

        @Override
        public T next() {throw new NoSuchElementException();
        }

        @Override
        public void forEachRemaining(Consumer<? super T> action) {seq.consume(action::accept);
        }
    };
    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED),
        false);
}

也就是说,咱当初甚至能用生成器来结构 Stream 了!比方:

public static void main(String[] args) {
    Stream<Integer> stream = stream(c -> {c.accept(0);
        for (int i = 1; i < 5; i++) {c.accept(i);
        }
    });
    System.out.println(stream.collect(Collectors.toList()));
}

图灵在上,感激 Stream 的作者没有偷这个懒,没有用 while hasNext 来进行遍历,不然这操作咱还真玩不了。

当然因为这里的 Iterator 实质曾经产生了扭转,这种操作也会有一些限度,没法再应用 parallel 办法将其转为并发流,也不能用 limit 办法限度数量。不过除此以外,像 map, filter, flatMap, forEach, collect 等等办法,只有不波及流的中断,都能够失常应用。

有限递推数列

理论利用场景不多。Stream 的 iterate 办法能够反对单个种子递推的有限数列,但两个乃至多个种子的递推就无能为力了,比方最受程序员青睐的炫技专用斐波那契数列:

public static Seq<Integer> fibonaaci() {
    return c -> {
        int i = 1, j = 2;
        c.accept(i);
        c.accept(j);
        while (true) {c.accept(j = i + (i = j));
        }
    };
}

另外还有一个比拟有意思的利用,利用法里树的个性,进行丢番图迫近[22],简而言之,就是用有理数迫近实数。这是一个非常适合拿来做 demo 的且足够乏味的例子,限于篇幅起因我就不开展了,有机会另写文章探讨。

流的更多个性

流的聚合

如何设计流的聚合接口是一个很简单的话题,若要认真探讨简直又能够整出大几千字,限于篇幅这里简略提几句好了。在我看来,好的流式 API 应该要让流自身能间接调用聚合函数,而不是像 Stream 那样,先用 Collectors 结构一个 Collector,再用 stream 去调用 collect。能够比照下以下两种形式,孰优孰劣高深莫测:

Set<Integer> set1 = stream.collect(Collectors.toSet());
String string1 = stream.map(Integer::toString).collect(Collectors.joinning(","));

Set<Integer> set2 = seq.toSet();
String string2 = seq.join(",", Integer::toString);

这一点上,Kotlin 做的比 Java 好太多。不过无利往往也有弊,从函数接口而非用户应用的角度来说,Collector 的设计其实更为齐备,它对于流和 groupBy 是同构的:所有能用 collector 对流间接做到的事件,groupBy 之后用雷同的 collector 也能做到,甚至 groupBy 自身也是一个 collector。

所以更好的设计是既保留函数式的齐备性与同构性,同时也提供由流间接调用的快捷方式。为了阐明,这里举一个 Java 和 Kotlin 都没有实现但需要很广泛的例子,求加权均匀:

public static void main(String[] args) {Seq<Integer> seq = Seq.of(1, 2, 3, 4, 5, 6, 7, 8, 9);

    double avg1 = seq.average(i -> i, i -> i); // = 6.3333
    double avg2 = seq.reduce(Reducer.average(i -> i, i -> i)); // = 6.3333
    Map<Integer, Double> avgMap = seq.groupBy(i -> i % 2, Reducer.average(i -> i, i -> i)); // = {0=6.0, 1=6.6}
    Map<Integer, Double> avgMap2 = seq.reduce(Reducer.groupBy(i -> i % 2, Reducer.average(i -> i, i -> i)));
}

下面代码里的 average,Reducer.average,以及用在 groupBy 里的 average 都是齐全同构的,换句话说,同一个 Reducer,能够间接用在流上,也能够对流进行分组之后用在每一个子流上。这是一套相似 Collector 的 API,既解决了 Collector 的一些问题,同时也能提供更丰盛的个性。重点是,这玩意儿是凋谢的,且机制足够简略,谁都能写。

流的分段解决

分段解决其实是始终以来各种流式 API 的一个盲点,不论是 map 还是 forEach,咱们偶然会心愿前半截和后半截采取不同的解决逻辑,或者更间接一点的说心愿第一个元素非凡解决。对此,我提供了三种 API,元素替换 replace,分段 map,以及分段生产 consume。

还是以前文提到的下划线转驼峰的场景作为一个典型例子:在将下划线字符串 split 之后,对第一个元素应用 lowercase,对剩下的其余元素应用 capitalize。应用分段的 map 函数,能够更疾速的实现这一个性能。

static String underscoreToCamel(String str, UnaryOperator<String> capitalize) {
    // split=> 分段 map=>join
    return Seq.of(str.split("_")).map(capitalize, 1, String::toLowerCase).join("");
}

再举个例子,当你解析一个 CSV 文件的时候,对于存在表头的状况,在解析时就要别离解决:利用表头信息对字段重排序,残余的内容则按行转为 DTO。应用适当的分段解决逻辑,这一看似麻烦的操作是能够在一个流里一次性实现的。

一次性流还是可重用流?

相熟 Stream 的同学应该分明,Stream 是一种一次性的流,因为它的数据来源于一个 iterator,二次调用一个曾经用完的 Stream 会抛出异样。Kotlin 的 Sequence 则采纳了不同的设计理念,它的流来自于 Iterable,大部分状况下是可重用的。然而 Kotlin 在读文件流的时候,采纳的仍然是和 Stream 同样的思路,将 BufferedReader 封装为一个 Iterator,所以也是一次性的。

不同于以上二者,生成器的做法显然要更为灵便,流是否可重用,齐全取决于被生成器包进去的数据源是否可重用。比方下面代码里不论是本地文件还是 ODPS 表,只有数据源的构建是在生成器里边实现的,那天然就是可重用的。你能够像应用一个一般 List 那样,屡次应用同一个流。从这个角度上看,生成器自身就是一个 Immutable,它的元素生产,间接来自于代码块,不依赖于运行环境,不依赖于内存状态数据。对于任何消费者而言,都能够期待同一个生成器给出始终统一的流。

生成器的实质和人类一样,都是复读机

当然,复读机复读也是要看老本的,对于像 IO 这种高开销的流须要重复使用的场景,重复去做同样的 IO 操作必定不合理,咱们无妨设计出一个 cache 办法用于流的缓存。

最罕用的缓存形式,是将数据读进一个 ArrayList。因为 ArrayList 自身并没有实现 Seq 的接口,所以无妨造一个 ArraySeq,它既是 ArrayList,又是 Seq——正如我后面屡次提到的,List 人造就是 Seq。

public class ArraySeq<T> extends ArrayList<T> implements Seq<T> {
    @Override
    public void consume(Consumer<T> consumer) {forEach(consumer);
    }
}

有了 ArraySeq 之后,就能够立马实现流的缓存

default Seq<T> cache() {ArraySeq<T> arraySeq = new ArraySeq<>();
    consume(t -> arraySeq.add(t));
    return arraySeq;
}

仔细的敌人可能会留神到,这个 cache 办法我在后面结构并发流的时候曾经用到了。除此以外,借助 ArraySeq,咱们还能轻易的实现流的排序,感兴趣的敌人能够自行尝试。

二元流

既然能够用 consumer of callback 作为机制来构建流,那么有意思的问题来了,如果这个 callback 不是 Consumer 而是个 BiConsumer 呢?——答案就是,二元流!

public interface BiSeq<K, V> {void consume(BiConsumer<K, V> consumer);
}

二元流是一个全新概念,此前任何基于迭代器的流,比方 Java Stream,Kotlin Sequence,还有 Python 的生成器,等等等等,都玩不了二元流。我倒也不是针对谁,毕竟在座诸位的 next 办法都必须吐出一个对象实例,意味着即使想结构同时有两个元素的流,也必须包进一个 Pair 之类的构造体里——故而其本质上仍然是一个一元流。当流的元素数量很大时,它们的内存开销将非常显著。

哪怕是看起来最像二元流的 Python 的 zip:

for i, j in zip([1, 2, 3], [4, 5, 6]):
    pass

这里的 i 和 j,理论仍是对一个 tuple 进行解包之后的后果。

然而基于 callback 机制的二元流和它们齐全不一样,它和一元流是等同轻量的!这就意味着节俭内存同时还快。比方我在实现 CsvReader 时,重写了 String.split 办法使其输入为一个流,这个流与 DTO 字段 zip 为二元流,就能实现值与字段的一对一匹配。不须要借助下标,也不须要创立长期数组或 list 进行存储。每一个被宰割进去的 substring,在整个生命周期里都是一次性的,随用随丢。

这里额定值得一提的是,同 Iterable 相似,Java 里的 Map 天生就是一个二元流。

Map<Integer, String> map = new HashMap<>();
BiSeq<Integer, String> biSeq = map::forEach;

有了基于 BiConsumer 的二元流,天然也能够有基于 TriConsumer 三元流,四元流,以及基于 IntConsumer、DoubleConsumer 等原生类型的流等等。这是一个真正的流的大家族,里边甚至还有很多不同于一元流的非凡操作,这里就不过多开展了,只提一个:

二元流和三元流乃至多元流,能够在 Java 里结构出货真价实的惰性元组 tuple。当你的函数须要返回多个返回值的时候,除了手写一个 Pair/Triple,你当初有了更好的抉择,就是用生成器的形式间接返回一个 BiSeq/TriSeq,这比间接的元组还额定减少了的惰性计算的劣势,能够在真正须要应用的时候再用回调函数去生产。你甚至连空指针查看都省了。

结束语

首先感激你能读到这里,我要讲的故事大体曾经讲完了,尽管还有许多称得上乏味的细节没放进去探讨,但曾经不影响这个故事的完整性了。我想要再次强调的是,下面这所有的内容,代码也好,个性也好,案例也罢,包含我所实现的 CsvReader 系列——全副都衍生自这一个简略接口,它是所有的源头,是梦开始的中央,齐全值得我在文末再写一遍

public interface Seq<T> {void consume(Consumer<T> consumer);
}

对于这个神奇的接口,我愿称之为:

道生一——先有 Seq 定义

毕生二——导出 Seq 一体两面的个性,既是流,又是生成器

二生三——由生成器实现出丰盛的流式 API,而后导出可平安隔离的 IO 流,最终导出异步流、并发流以及通道个性

至于三生万物的局部,还会有后续文章,期待能早日对外开源吧。

附录

附录的本来内容蕴含 API 文档,援用地址,以及性能 benchmark。因为暂未开源,这里仅介绍下 Monad 相干。

Monad

Monad[24]是来自于领域论里的一个概念,同时也是函数式编程语言代表者 Haskell 里极为重要的一种设计模式。但它无论是对流还是对生成器而言都不是必须的,所以放在附录讲。

我之所以要提 Monad,是因为 Seq 在实现了 unit, flatMap 之后,天然也就成为了一种 Monad。对于关注相干实践的同学来说,如果连提都不提,可能会有些好受。遗憾的是,尽管 Seq 在模式上是个 Monad,但它们在理念上是存在一些抵触的。比方说在 Monad 里至关重要的 flatMap,既是外围定义之一,还承当着组合与拆包两大重要性能。甚至连 map 对 Monad 来说都不是必须的,它齐全能够由 flatMap 和 unit 推导进去(推导过程见下文),反之还不行。然而对于流式 API 而言,map 才是真正最为要害和高频的操作,flatMap 反而没那么重要,甚至压根都不太罕用。

Monad 这种设计模式之所以被推崇备至,是因为它有几个重要个性,惰性求值、链式调用以及副作用隔离——在纯函数的世界里,后者甚至称得上是性命攸关的小事。然而对包含 Java 在内的大部分失常语言来说,实现惰性求值更间接的形式是面向接口而不是面向对象 (实例) 编程,接口因为没有成员变量,天生就是惰性的。链式操作则是流的天生个性,毋庸赘述。至于副作用隔离,这同样不是 Monad 的专利。生成器用闭包 +callback 的形式也能做到,前文都有介绍。

推导 map 的实现

首先,map 能够由 unit 与 flatMap 间接组合失去,这里无妨称之为 map2:

default <E> Seq<E> map2(Function<T, E> function) {return flatMap(t -> unit(function.apply(t)));
}

即把类型为 T 的元素,转变为类型为 E 的 Seq,再用 flatMap 合并。这个是最直观的,不须要流的先验概念,是 Monad 的固有属性。当然其在效率上必定很差,咱们能够对其化简。

已知 unit 与 flatMap 的实现

static <T> Seq<T> unit(T t) {return c -> c.accept(t);
}

default <E> Seq<E> flatMap(Function<T, Seq<E>> function) {return c -> supply(t -> function.apply(t).supply(c));
}

先开展 unit,代入下面 map2 的实现,有

default <E> Seq<E> map3(Function<T, E> function) {return flatMap(t -> c -> c.accept(function.apply(t)));
}

把这个 flatMap 里边的函数提出来变成 flatFunction,再开展 flatMap,有

default <E> Seq<E> map4(Function<T, E> function) {Function<T, Seq<E>> flatFunction = t -> c -> c.accept(function.apply(t));
    return consumer -> supply(t -> flatFunction.apply(t).supply(consumer));
}

容易留神到,这里的 flatFunction 间断有两个箭头,它其实就齐全等价于一个双参数 (t, c) 函数的柯里化 currying。咱们对其做逆柯里化操作,反推出这个双参数函数:

Function<T, Seq<E>> flatFunction = t -> c -> c.accept(function.apply(t));
// 等价于
BiConsumer<T, Consumer<E>> biConsumer = (t, c) -> c.accept(function.apply(t));

能够看到,这个等价的双参数函数其实就是一个 BiConsumer,再将其代入 map4,有

default <E> Seq<E> map5(Function<T, E> function) {BiConsumer<T, Consumer<E>> biConsumer = (t, c) -> c.accept(function.apply(t));
    return c -> supply(t -> biConsumer.accept(t, c));
}

留神到,这里 biConsumer 的实参和形参是完全一致的,所以能够将它的办法体代入下边间接替换,于是有

default <E> Seq<E> map6(Function<T, E> function) {return c -> supply(t -> c.accept(function.apply(t)));
}

到这一步,这个 map6,就和前文从流式概念登程间接写进去的 map 完全一致了。证毕!

参考链接:

[1]https://en.wikipedia.org/wiki/Generator_(computer_programming)

[2]https://www.pythonlikeyoumeanit.com/Module2_EssentialsOfPytho…

[3]https://openjdk.org/projects/loom/

[4]https://en.wikipedia.org/wiki/Continuation

[5]https://hackernoon.com/the-magic-behind-python-generator-func…

[6]https://en.wikipedia.org/wiki/Continuation-passing_style

[7]https://kotlinlang.org/spec/asynchronous-programming-with-cor…

[8]https://zh.wikipedia.org/wiki/Map_(%E9%AB%98%E9%98%B6%E5%87%BD%E6%95%B0)

[9]https://crypto.stanford.edu/~blynn/haskell/io.html

[10]https://www.autohotkey.com/docs/v2/

[11]https://stackoverflow.com/questions/1052476/what-is-scalas-yield

[12]https://stackoverflow.com/questions/10441559/scala-equivalent-of-haskells-do-notation-yet-again

[13]https://opencsv.sourceforge.net/

[14]https://github.com/FasterXML/jackson-dataformats-text/tree/master/csv

[15]https://github.com/uniVocity/univocity-parsers

[16]https://github.com/alibaba/easyexcel

[17]https://github.com/alibaba/easyexcel/issues/1566

[18]https://github.com/alibaba/easyexcel/pull/3052

[20]https://github.com/alibaba/easyexcel/pull/3052

[21]https://github.com/alibaba/fastjson2/blob/f30c9e995423603d5b80f3efeeea229b76dc3bb8/extension/src/main/java/com/alibaba/fastjson2/support/csv/CSVParser.java#L197

[22]https://www.bilibili.com/video/BV1ha41137oW/?is_story_h5=fals…

[24]https://en.wikipedia.org/wiki/Monad_(functional_programming)

更多内容,请点击此处进入云原生技术社区查看

退出移动版