关于go:golang中的errgroup

47次阅读

共计 7135 个字符,预计需要花费 18 分钟才能阅读完成。

0.1、索引

https://waterflow.link/articles/1665239900004

1、串行执行

如果咱们须要查问一个课件列表,其中有课件的信息,还有课件创建者的信息,和课件的缩略图信息。然而此时咱们曾经对服务做了拆分,假如有 课件服务 用户服务 还有 文件服务

咱们通常的做法是,当咱们查问课件列表时,咱们首先调用课件服务,比方查问 10 条课件记录,而后获取到课件的创建人 ID,课件的缩略图 ID;再通过这些创建人 ID 去用户服务查问用户信息,通过缩略图 ID 去文件服务查问文件信息;而后再写到这 10 条课件记录中返回给前端。

像上面这样:

package main

import (
    "fmt"
    "time"
)

type Courseware struct {
    Id         int64
    Name       string
    Code       string
    CreateId   int64
    CreateName string
    CoverId   int64
    CoverPath string
}

type User struct {
    Id   int64
    Name string
}

type File struct {
    Id   int64
    Path string
}

var coursewares []Courseware
var users map[int64]User
var files map[int64]File
var err error

func main() {
    // 查问课件
    coursewares, err = CoursewareList()
    if err != nil {fmt.Println("获取课件谬误")
        return
    }

    // 获取用户 ID、文件 ID
    userIds := make([]int64, 0)
    fileIds := make([]int64, 0)
    for _, courseware := range coursewares {userIds = append(userIds, courseware.CreateId)
        fileIds = append(fileIds, courseware.CoverId)
    }

    // 批量获取用户信息
    users, err = UserMap(userIds)
    if err != nil {fmt.Println("获取用户谬误")
        return
    }

    // 批量获取文件信息
    files, err = FileMap(fileIds)
    if err != nil {fmt.Println("获取文件谬误")
        return
    }

    // 填充
    for i, courseware := range coursewares {if user, ok := users[courseware.CreateId]; ok {coursewares[i].CreateName = user.Name
        }

        if file, ok := files[courseware.CoverId]; ok {coursewares[i].CoverPath = file.Path
        }
    }
    fmt.Println(coursewares)
}

func UserMap(ids []int64) (map[int64]User, error) {time.Sleep(3 * time.Second) // 模仿数据库申请
    return map[int64]User{1: {Id: 1, Name: "liu"},
        2: {Id: 2, Name: "kang"},
    }, nil
}

func FileMap(ids []int64) (map[int64]File, error) {time.Sleep(3 * time.Second) // 模仿数据库申请
    return map[int64]File{1: {Id: 1, Path: "/a/b/c.jpg"},
        2: {Id: 2, Path: "/a/b/c/d.jpg"},
    }, nil
}

func CoursewareList() ([]Courseware, error) {time.Sleep(3 * time.Second)
    return []Courseware{{Id: 1, Name: "课件 1", Code: "CW1", CreateId: 1, CreateName: "", CoverId: 1, CoverPath:""},
        {Id: 2, Name: "课件 2", Code: "CW2", CreateId: 2, CreateName: "", CoverId: 2, CoverPath:""},
    }, nil
}

2、并发执行

但咱们获取课件之后,填充用户信息和文件信息是能够并行执行的,咱们能够批改获取用户和文件的代码,把他们放到协程外面,这样就能够并行执行了:

...

    // 此处放到协程里
    go func() {
        // 批量获取用户信息
        users, err = UserMap(userIds)
        if err != nil {fmt.Println("获取用户谬误")
            return
        }
    }()

    // 此处放到协程里
    go func() {
        // 批量获取文件信息
        files, err = FileMap(fileIds)
        if err != nil {fmt.Println("获取文件谬误")
            return
        }
    }()

    ...

然而当你执行的时候你会发现这样是有问题的,因为上面的填充数据的代码有可能会在这两个协程执行实现之前去执行。也就是说最终的数据有可能没有填充用户信息和文件信息。那怎么办呢?这是咱们就能够应用 golang 的 waitgroup 了,次要作用就是协程的编排。

咱们能够等 2 个协程都执行实现再去走上面的填充逻辑

咱们持续批改代码成上面的样子

...

// 初始化一个 sync.WaitGroup
var wg sync.WaitGroup

func main() {
    // 查问课件
    ...
    // 获取用户 ID、文件 ID
    ...

    // 此处放到协程里
    wg.Add(1) // 计数器 +1
    go func() {defer wg.Done() // 计数器 -1
        // 批量获取用户信息
        users, err = UserMap(userIds)
        if err != nil {fmt.Println("获取用户谬误")
            return
        }
    }()

    // 此处放到协程里
    wg.Add(1) // 计数器 +1
    go func() {defer wg.Done() // 计数器 -1
        // 批量获取文件信息
        files, err = FileMap(fileIds)
        if err != nil {fmt.Println("获取文件谬误")
            return
        }
    }()

  // 阻塞期待计数器小于等于 0
    wg.Wait()

    // 填充
    for i, courseware := range coursewares {if user, ok := users[courseware.CreateId]; ok {coursewares[i].CreateName = user.Name
        }

        if file, ok := files[courseware.CoverId]; ok {coursewares[i].CoverPath = file.Path
        }
    }
    fmt.Println(coursewares)
}

...

咱们初始化一个 sync.WaitGroup,调用 wg.Add(1)给计数器加一,调用 wg.Done()计数器减一,wg.Wait()阻塞期待直到计数器小于等于 0,完结阻塞,持续往下执行。

3、errgroup

然而咱们当初又有这样的需要,咱们心愿如果获取用户或者获取文件有任何一方报错了,间接抛错,不再组装数据。

咱们能够像上面这样写

...

var goErr error
var wg sync.WaitGroup

...

func main() {
    ...

    // 此处放到协程里
    wg.Add(1)
    go func() {defer wg.Done()
        // 批量获取用户信息
        users, err = UserMap(userIds)
        if err != nil {
            goErr = err
            fmt.Println("获取用户谬误:", err)
            return
        }
    }()

    // 此处放到协程里
    wg.Add(1)
    go func() {defer wg.Done()
        // 批量获取文件信息
        files, err = FileMap(fileIds)
        if err != nil {
            goErr = err
            fmt.Println("获取文件谬误:", err)
            return
        }
    }()

    wg.Wait()

    if goErr != nil {fmt.Println("goroutine err:", err)
        return
    }

    ...
}

...

把谬误放在 goErr 中,完结阻塞后判断协程调用是否抛错。

那 golang 外面有没有相似这样的实现呢?答案是有的,那就是 errgroup。其实和咱们下面的办法差不多,然而 errgroup 包做了一层构造体的封装,也不须要在每个协程外面判断 error 传给 errGo 了。

上面是 errgroup 的实现

package main

import (
    "errors"
    "fmt"
    "golang.org/x/sync/errgroup"
    "time"
)

type Courseware struct {
    Id         int64
    Name       string
    Code       string
    CreateId   int64
    CreateName string
    CoverId   int64
    CoverPath string
}

type User struct {
    Id   int64
    Name string
}

type File struct {
    Id   int64
    Path string
}

var coursewares []Courseware
var users map[int64]User
var files map[int64]File
var err error
// 定义一个 errgroup
var eg errgroup.Group

func main() {
    // 查问课件
    coursewares, err = CoursewareList()
    if err != nil {fmt.Println("获取课件谬误:", err)
        return
    }

    // 获取用户 ID、文件 ID
    userIds := make([]int64, 0)
    fileIds := make([]int64, 0)
    for _, courseware := range coursewares {userIds = append(userIds, courseware.CreateId)
        fileIds = append(fileIds, courseware.CoverId)
    }


    // 此处放到协程里
    eg.Go(func() error {
        // 批量获取用户信息
        users, err = UserMap(userIds)
        if err != nil {fmt.Println("获取用户谬误:", err)
            return err
        }
        return nil
    })

    // 此处放到协程里
    eg.Go(func() error {
        // 批量获取文件信息
        files, err = FileMap(fileIds)
        if err != nil {fmt.Println("获取文件谬误:", err)
            return err
        }
        return nil
    })

  // 判断 group 中是否有报错
    if goErr := eg.Wait(); goErr != nil {fmt.Println("goroutine err:", err)
        return
    }

    // 填充
    for i, courseware := range coursewares {if user, ok := users[courseware.CreateId]; ok {coursewares[i].CreateName = user.Name
        }

        if file, ok := files[courseware.CoverId]; ok {coursewares[i].CoverPath = file.Path
        }
    }
    fmt.Println(coursewares)
}

func UserMap(ids []int64) (map[int64]User, error) {time.Sleep(3 * time.Second)
    return map[int64]User{1: {Id: 1, Name: "liu"},
        2: {Id: 2, Name: "kang"},
    }, errors.New("sql err")
}

func FileMap(ids []int64) (map[int64]File, error) {time.Sleep(3 * time.Second)
    return map[int64]File{1: {Id: 1, Path: "/a/b/c.jpg"},
        2: {Id: 2, Path: "/a/b/c/d.jpg"},
    }, nil
}

func CoursewareList() ([]Courseware, error) {time.Sleep(3 * time.Second)
    return []Courseware{{Id: 1, Name: "课件 1", Code: "CW1", CreateId: 1, CreateName: "", CoverId: 1, CoverPath:""},
        {Id: 2, Name: "课件 2", Code: "CW2", CreateId: 2, CreateName: "", CoverId: 2, CoverPath:""},
    }, nil
}

当然,errgroup 中也有针对上下文的 errgroup.WithContext 函数,如果咱们想管制申请接口的工夫,用这个是最合适不过的。如果申请超时会返回一个敞开上下文的报错,像上面这样

package main

import (
    "context"
    "fmt"
    "golang.org/x/sync/errgroup"
    "time"
)

type Courseware struct {
    Id         int64
    Name       string
    Code       string
    CreateId   int64
    CreateName string
    CoverId    int64
    CoverPath  string
}

type User struct {
    Id   int64
    Name string
}

type File struct {
    Id   int64
    Path string
}

var coursewares []Courseware
var users map[int64]User
var files map[int64]File
var err error

func main() {
    // 查问课件
    ...

    // 获取用户 ID、文件 ID
    ...

  // 定义一个带超时工夫的上下文,1 秒钟超时
    ctx, cancelFunc := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancelFunc()
  // 定义一个带上下文的 errgroup,应用下面带有超时工夫的上下文
    eg, ctx := errgroup.WithContext(ctx)
    // 此处放到协程里
    eg.Go(func() error {
        // 批量获取用户信息
        users, err = UserMap(ctx, userIds)
        if err != nil {fmt.Println("获取用户谬误:", err)
            return err
        }
        return nil
    })

    // 此处放到协程里
    eg.Go(func() error {
        // 批量获取文件信息
        files, err = FileMap(ctx, fileIds)
        if err != nil {fmt.Println("获取文件谬误:", err)
            return err
        }
        return nil
    })

    if goErr := eg.Wait(); goErr != nil {fmt.Println("goroutine err:", err)
        return
    }

    // 填充
    for i, courseware := range coursewares {if user, ok := users[courseware.CreateId]; ok {coursewares[i].CreateName = user.Name
        }

        if file, ok := files[courseware.CoverId]; ok {coursewares[i].CoverPath = file.Path
        }
    }
    fmt.Println(coursewares)
}

func UserMap(ctx context.Context, ids []int64) (map[int64]User, error) {result := make(chan map[int64]User)
    go func() {time.Sleep(2 * time.Second) // 伪装申请超过 1 秒钟
        result <- map[int64]User{1: {Id: 1, Name: "liu"},
            2: {Id: 2, Name: "kang"},
        }
    }()

    select {case <-ctx.Done(): // 如果上下文完结间接返回错误信息
        return nil, ctx.Err()
    case res := <-result: // 返回正确后果
        return res, nil
    }
}

func FileMap(ctx context.Context, ids []int64) (map[int64]File, error) {return map[int64]File{1: {Id: 1, Path: "/a/b/c.jpg"},
        2: {Id: 2, Path: "/a/b/c/d.jpg"},
    }, nil
}

func CoursewareList() ([]Courseware, error) {time.Sleep(3 * time.Second)
    return []Courseware{{Id: 1, Name: "课件 1", Code: "CW1", CreateId: 1, CreateName: "", CoverId: 1, CoverPath:""},
        {Id: 2, Name: "课件 2", Code: "CW2", CreateId: 2, CreateName: "", CoverId: 2, CoverPath:""},
    }, nil
}

执行下面的代码:

go run waitgroup.go
获取用户谬误:context deadline exceeded
goroutine err: context deadline exceeded

正文完
 0