关于go:gozero-mapreduce源码分析和简单实现

9次阅读

共计 1447 个字符,预计需要花费 4 分钟才能阅读完成。

Mapreduce

Mapreduce 是一种分布式并行编程模型,在一个函数或者一次接口调用中会呈现大量的计算或者大量的调用第三方接口的状况。这个时候就能够应用 Mapreduce 这种变成模型,让大量的计算在一台或者多台机器上解决,最终汇总到一起输入后果。

gozero 中的 Mapreduce

gozero 是一个最近比拟风行的 go 微服务框架,然而在这个库中也有一些比拟有意思和好用的类库,咱们能够独自援用,就比方其中的 Mapreduce。
官网是这么说的:在理论的业务场景中咱们经常须要从不同的 rpc 服务中获取相应属性来组装成简单对象。如果是串行调用的话响应工夫会随着 rpc 调用次数呈线性增长,所以咱们要优化性能个别会将串行改并行。
咱们晓得如果本人实现一套并行的模式还是比拟麻烦的,gozero Mapreduce 就能够帮忙咱们非常容易的实现这样的成果。让咱们重点关注本人的业务逻辑的实现。

简略应用

这里须要提一下的时,gozero 中还提供了线程数量的管制。能够让你本人管制并行处理的线程数,免得过多的线程对服务器造成过大的耗费。同时咱们也能够传入本人的 context 来管制整个办法的超时和勾销逻辑。这些都封装在类库中,不须要咱们去操心。只须要传递对的参数就能够了。
上面就是一个比较简单的时候,读取一个数组并行加上“:1”最终输出扭转后的数组对象。

package main

import (
    "context"
    "fmt"
    "github.com/zeromicro/go-zero/core/mr"
    "time"
)

func main() {
    // 要解决的数据
    uid := []string{"a", "b", "c", "d", "e", "f"}
    // 传递数据的逻辑
    generateFunc := func(source chan<- interface{}) {
        for _, v := range uid {
            source <- v
            fmt.Println("source:", v)
        }
    }

    // 解决数据的逻辑
    mapFunc := func(item interface{}, writer mr.Writer, cancel func(err error)) {tmp := item.(string) + ":1"
        writer.Write(tmp)
        fmt.Println("item:", item)
    }

    // 合并的数据逻辑
    reducerFunc := func(pipe <-chan interface{}, writer mr.Writer, cancel func(err error)) {var uid []string
        for v := range pipe {uid = append(uid, v.(string))
            fmt.Println("pipe:", uid)
        }
        writer.Write(uid)
    }

    // 开始并发解决数据
    // 超时工夫
    ctx, cl := context.WithTimeout(context.Background(), time.Second*3)
    res, err := mr.MapReduce(generateFunc, mapFunc, reducerFunc, mr.WithContext(ctx))
    // 开启现成管制超时,如果超时则调用 cl 办法进行所有携程
    go func() {time.Sleep(time.Second * 2)
        fmt.Println("cl")
        cl()}()
    fmt.Println(res, err)
}

源码剖析

首先咱们先看下一整个函数的流程图,这个流程图是我本人的了解,如果有不对的中央请大家在评论区探讨。

正文完
 0