共计 2893 个字符,预计需要花费 8 分钟才能阅读完成。
“咱们不短少计算机,短少的是聪慧地应用计算机的办法。”
日常编程的时候,我有时候会不盲目的把计算机当成一个人,以对人谈话的形式来给计算机布置任务。然而,计算机和人类的一个次要区别就是,它会一字不差地执行程序,遇到非凡状况时不会做变通。
比方咱们想统计一个文件里的词频,最直观的形式就是:
File.stream!("path/to/some/file") | |
|> Enum.flat_map(&String.split(&1, " ")) | |
|> Enum.reduce(%{}, fn word, acc -> | |
Map.update(acc, word, 1, & &1 + 1) | |
end) | |
|> Enum.to_list() |
第一行是应用 File.stream!/1
关上文件,它能够让咱们逐行读取文件,这一步不会把文件内容读取进去。第二行就不得了了,会把文件的全部内容都读取到内存中。在这里如果文件过大,有可能间接就撑爆内存了。
File.stream!("path/to/some/file") | |
|> Stream.flat_map(&String.split(&1, " ")) | |
|> Enum.reduce(%{}, fn word, acc -> | |
Map.update(acc, word, 1, & &1 + 1) | |
end) | |
|> Enum.to_list() |
既然 Enum.flat_map/2
太过暴力,咱们就用 Stream.flat_map/2
来代替它,这样,在第二行仍旧不会读取任何文件内容。到第三行的 Enum.reduce/3
这里会开始逐行读取文件内容并且应用一个 hash map 来统计词频。这样做根本不会呈现内存爆炸的状况了。当初的处理器根本都是多核的,咱们能不能把多核处理器利用起来呢?
不便起见,咱们用上面这个列表示意文件的每一行(只管这样就无奈体现出解决大文件的特点了,但咱们只有晓得程序不会一下子读取全部内容到内存就行了)
data = [ | |
"rose are red", | |
"violets are blue" | |
] |
第一步,和 Stream
相似,咱们生成一个 lazy 的 Flow
数据结构:
opts = [stages: 2, max_demand: 1] | |
flow = flow | |
|> Flow.from_enumerable(opts) | |
%Flow{operations: [], | |
options: [stages: 2, max_demand: 1], | |
producers: {:enumerables, [["rose are red", "violets are blue"]]}, | |
window: %Flow.Window.Global{periodically: [], trigger: nil} | |
} |
stages
能够了解为并行的外围数量,实质上是参加并行处理的 gen_stage
过程数量。这里咱们设置为 2,与双核机器上的默认配置雷同。
接下来的 flat_map
和 reduce
操作也很下面的十分相似。
flow = flow | |
|> Flow.flat_map(&String.split/1) | |
|> Flow.reduce(fn -> %{} end, fn word, acc -> Map.update(acc, word, 1, &(&1 + 1)) end) | |
%Flow{ | |
operations: [ | |
{:reduce, #Function<45.65746770/0 in :erl_eval.expr/5>, | |
#Function<43.65746770/2 in :erl_eval.expr/5>}, | |
{:mapper, :flat_map, [&String.split/1]} | |
], | |
options: [stages: 2, max_demand: 1], | |
producers: {:enumerables, [["rose are red", "violets are blue"]]}, | |
window: %Flow.Window.Global{periodically: [], trigger: nil} | |
} |
flow |> Enum.to_list() | |
[{"are", 1}, {"blue", 1}, {"violets", 1}, {"are", 1}, {"red", 1}, {"rose", 1}] |
通过调用立刻执行类的函数,例如 Enum.to_list/1
,Flow
终于才开始理论执行。留神到后果里的 {"are", 1}
呈现了两次,这是为什么呢?
还记得咱们设置的 stages: 2, max_demand: 1
选项吗,这意味着参加解决工作的 stages 数量是 2,且每个 stage 每次最多解决 1 个事件(event)。这样设置的后果就是 “rose are red” 和 “violets are blue” 别离交给了不同的 stage 来解决,最初的后果只会简略地拼合在一起。而要实现最初的合并,会是一个只能单过程执行的操作,这是咱们不愿看到的。
有没有方法在调配事件的时候就防止这个问题呢?如果咱们可能把雷同的事件都调配给同一个 stage,就可能防止最初的合并问题了。应用 hash 来调配事件是极好的,Flow.partition
的作用就是如此。
flow = flow | |
|> Flow.flat_map(&String.split/1) | |
|> Flow.partition(opts) | |
|> Flow.reduce(fn -> %{} end, fn word, acc -> Map.update(acc, word, 1, &(&1 + 1)) end) | |
%Flow{ | |
operations: [ | |
{:reduce, #Function<45.65746770/0 in :erl_eval.expr/5>, | |
#Function<43.65746770/2 in :erl_eval.expr/5>} | |
], | |
options: [stages: 2, max_demand: 1], | |
producers: {:flows, | |
[ | |
%Flow{operations: [{:mapper, :flat_map, [&String.split/1]}], | |
options: [stages: 2, max_demand: 1], | |
producers: {:enumerables, [["rose are red", "violets are blue"]]}, | |
window: %Flow.Window.Global{periodically: [], trigger: nil} | |
} | |
]}, | |
window: %Flow.Window.Global{periodically: [], trigger: nil} | |
} |
能看到在 partition 之后本来的 Flow 被嵌套到了新的 Flow 外部,这也是咱们须要再次传入 opts 的起因。在外部的 Flow 执行结束之后,内部的 Flow 才会接下去执行。这一次,单词依据 hash 被调配到了不同的 stages。(咱们在下面的 Flow 构造中看不到任何对于 hash 的信息,因为这就是事件调配的默认形式)
flow |> Enum.to_list() | |
[{"blue", 1}, {"rose", 1}, {"violets", 1}, {"are", 2}, {"red", 1}] |
胜利地达到了预期,即不再须要额定计算来合并后果。
文中代码来自 https://hexdocs.pm/flow/Flow….