本文首发于泊浮目的简书:https://www.jianshu.com/u/204…
一个风和日丽的下午, 我看着日常看代码做重构迁移, 看到这么段代码:
突然, 我看到了这样的代码:
private void getTopicsDiskSizeForSomeBroker(int brokerID, AdminClient admin, Map<String, Long> topicsSizeMap) throws ExecutionException, InterruptedException {DescribeLogDirsResult ret = admin.describeLogDirs(Collections.singletonList(brokerID));
Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> tmp = ret.all().get();
for (Map.Entry<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> entry : tmp.entrySet()) {Map<String, DescribeLogDirsResponse.LogDirInfo> tmp1 = entry.getValue();
for (Map.Entry<String, DescribeLogDirsResponse.LogDirInfo> entry1 : tmp1.entrySet()) {DescribeLogDirsResponse.LogDirInfo info = entry1.getValue();
Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfoMap = info.replicaInfos;
for (Map.Entry<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicas : replicaInfoMap.entrySet()) {String topic = replicas.getKey().topic();
Long topicSize = topicsSizeMap.get(topic);
if (topicSize == null) {topicsSizeMap.put(topic, replicas.getValue().size);
} else {topicsSizeMap.put(topic, replicas.getValue().size + topicSize);
}
}
}
}
}
看了这段代码我整个人都不好了!
首先是那火箭式的三个嵌套 for 循环, 再者就是那些变量声明语句. 为了迭代他们, 我们不得不声明它一遍 …
public List<KafkaTopicInfoBO> getTopicDiskSize() {return getTopicPartitionReplicaInfo().entrySet().stream()
.map(e -> new KafkaTopicInfoBO(e.getKey().topic(), e.getValue().size))
.collect(Collectors.toList());
}
protected Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> getTopicPartitionReplicaInfo() {Properties globalConf = zkConfigService.getProperties(ZkPathUtils.GLOBAL_CONFIG);
Properties adminConfig = new Properties();
adminConfig.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, globalConf.getProperty((ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)));
AdminClient adminClient = AdminClient.create(adminConfig);
List<String> brokerIds = zkConfigService.getChildByPath(kafkaIdsPath);
return brokerIds.stream()
.map(Integer::valueOf)
.map(Collections::singletonList)
.map(adminClient::describeLogDirs)
.map(DescribeLogDirsResult::all)
.map(mapKafkaFuture -> {
try {return mapKafkaFuture.get();
} catch (Exception e) {throw new RuntimeException(e);
}
})
.map(Map::values)
.flatMap(Collection::stream)
.map(Map::values)
.flatMap(Collection::stream)
.map(e -> e.replicaInfos)
.map(Map::entrySet)
.flatMap(Collection::stream)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
这样看起来似乎好了一点. 但是对于不熟悉 函数式编程 的同学来说, 理解以上代码还是有点困难的.
接下来, 先来简单讲一讲函数式编程.
就像来自数学中的代数
f(x)=5x^2+4x+3
g(x)=2f(x)+5=10x^2+8x+11
h(x)=f(x)+g(x)=15x^2+12x+14
函数式编程定义输入数据和输出数据相关的关系——数学表达式里面其实是在做一种映射(Mapping), 输入的数据和输出的数据关系是什么样的, 就是用来函数定义的.
public class Quint{public static void main (String args[]){for (int i=0; i<25; i++){System.out.println(i*i);
}
}
}
(println (take 25 (map (fn [x] (*x x) (range)))))
简单解释一下上段 Lisp
代码:
- range 函数回返回一个从 0 开始的整数无穷列表
- 然后该列表会被传入 map, 针对列表中的每个元素, 调用平方值的匿名函数, 产生了一个无穷多的, 包含平方值的列表
- 将列表传入 take 函数, 仅仅返回前 25 个
- println 将接入的参数输出
protected fun getTopicPartitionReplicaInfo(): Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> {val globalConf = zkConfigService.getProperties(ZkPathUtils.GLOBAL_CONFIG)
val adminConfig = Properties()
adminConfig.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, globalConf.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
val adminClient = AdminClient.create(adminConfig)
val brokerIds = zkConfigService.getChildByPath(kafkaIdsPath)
return brokerIds.stream()
.mapToInt {Integer.valueOf(it)
}.let { intStream ->
adminClient.describeLogDirs(intStream.boxed().collect(Collectors.toList()))
}.let { describeLogDirsResult ->
describeLogDirsResult.all()}.let { mapKafkaFutrue ->
mapKafkaFutrue.get()}.let { mapStream ->
mapStream.values
}.let {it.stream().map {e -> e.values}.flatMap {e -> e.stream() }.collect(Collectors.toList())
}.flatMap {it.replicaInfos.entries.toList()
}.let { it ->
it.associateBy({it.key}, {it.value})
}
}
代码看起来大差不差. 但 Kotlin 的这些关键字写起来更方便. 我们看下 Java 中 map
函数和 Kotlin 中 let
函数的签名:
* Returns a stream consisting of the results of applying the given
* function to the elements of this stream.
*
* <p>This is an <a href="package-summary.html#StreamOps">intermediate
* operation</a>.
*
* @param <R> The element type of the new stream
* @param mapper a <a href="package-summary.html#NonInterference">non-interfering</a>,
* <a href="package-summary.html#Statelessness">stateless</a>
* function to apply to each element
* @return the new stream
*/
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
/**
* Calls the specified function [block] with `this` value as its argument and returns its result.
*
* For detailed usage information see the documentation for [scope functions](https://kotlinlang.org/docs/reference/scope-functions.html#let).
*/
@kotlin.internal.InlineOnly
public inline fun <T, R> T.let(block: (T) -> R): R {
contract {callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return block(this)
}
我们可以看到 Java 中的 map
是被限制在 Stream API 中的, 而 Kotlin 的 let
并没有这种限制.
同时, 我们也可以感受到, 对于函数式编程的支持, 明显是 Kotlin 更好一些——在 Kotlin 中, 我们用一个 ()
就可以表示函数, 而 Java 则需要 Interface 来表示(在 Java 中, 对象是一等公民).
如果读者有兴趣的话, 可以尝试一下 Haskell
或Lisp
(JVM 上叫 Clojure). 这些都是纯函数式语言.
类似,Kotlin 还有很多这种函数, 被称为作用域函数, 在这里罗列一下常用的函数:
- let
- run
- also
- apply
- takeIf
- takeUnless
- repeat
在《架构整洁之道》中, 有这么一个总结:
- 结构化编程是对程序控制权的直接转移的限制
- 面向对象编程是对程序控制权的间接转移的限制
- 函数式编程是对程序赋值操作的限制
如果说面向对象编程是对数据进行抽象, 那么函数式编程则是对行为进行抽象.
- Map
- Reduce
- Filter
举个例子, 面包和蔬菜 map 到切碎的操作上, 再 reduce 成汉堡.
我们可以看到 map 和 reduce 不关心输入数据, 它们只控制, 并不是业务. 控制是描述怎么干, 而业务描述要干什么.
在本文中, 我们只看到了 map 的身影——上面提到了,map 对流中的每一个元素进行操作.
可能会有读者问 let
是啥, 在本文的代码例子中,let
针对整个流进行操作.
简单来说, Map && Reduce
对应了我们日常中用的 循环
, 而Filter
对应了If
-
优势
- 无状态
- 并发无伤害
- 函数执行没有顺序上的问题
-
劣势
- 数据复制严重