17Go-语言几个并发模式

45次阅读

共计 15223 个字符,预计需要花费 39 分钟才能阅读完成。

并发模式

要想写出高效简单的并发程序,还需要了解下常用的 goroutine 和 channel 以哪种方式写,下面介绍 3 种常用的并发模式,写出更简化高效的并发。

1、runner

Runner 模式可以理解为执行者,也就是来控制程序的执行,它可以去执行任何程序,程序都是受监控的,可以去终止这些程序。当我们需要调度后台处理任务程序的时候,这种模式很拥有。简单说就是,控制、执行,中断、退出。

创建 Runner 结构体

/*
runner 可以执行任何程序,可以监控程序,可以发送信号终止程序
 */
type Runner struct {
    interrupt chan os.Signal   // 发送的信号,用来终止程序
    complete  chan error       // 用于通知任务全部完成
    timeout   <-chan time.Time // 程序的超时时间
    tasks     []func(int)  // 要执行的任务
}

Runner 结构体很简单,主要包括 中断信号、完成信号、超时信号,以及要执行的任务队列。也就是说它是通过 channel 通道的信息读取来控制程序的执行、中断和退出,它能实现

  • 程序在分配的时间内完成工作,正常终止
  • 程序没有及时完成巩固走,自动杀死
  • 接收到操作系统发送的中断时间,程序立刻清理状态并停止

这三个 channel 分别是 os.Signal、error、time.Time 类型,这个中断信号类型是个 os.Signal 的通道,它是用来从操作系统中接收中断事件。完成时信号是个 error 类型,也就是正常完成,它为 nil, 只要不为 nil, 就有错误发生呗。timeout 只要能取出值,表示超时了,执行超时操作。

tasks 就是一个简单的切片,类型是接收一个 int 型参数的函数类型。

创建简单工厂函数。

// 工厂函数
func New(tm time.Duration) *Runner{
    return  &Runner{complete:make(chan error),// 无缓冲
        timeout:time.After(tm),
        interrupt:make(chan os.Signal,1), // 有缓冲
    }
}

这里关注两点,complete是无缓冲通道,interrupt是有缓冲通道。因为,main.runtime需要等 complete,任务完成,程序退出,所以,complete 必须是同步的,无缓冲的。

interrupt 初始化缓存区容量为 1,这样可以保证通道至少能接收一个来自运行时的 os.Signal 值,保证运行时发送这个事件的时候不会阻塞。

创建任务添加方法

// 将需要执行的任务,添加到 Runner 里
func (r *Runner) Add(tasks ...func(int)){r.tasks = append(r.tasks,tasks...)
}

创建两个错误类型变量

var ErrTimeOut = errors.New("执行者执行超时")
var ErrInterrupt = errors.New("执行者被中断")

这两个错误时接收到相应信号时返回的,一个是超时 一个是中断。

创建 run 方法

// 执行任务,执行的过程中接收到中断信号时,返回中断错误
// 如果任务全部执行完,还没有接收到中断信号,则返回 nil
func (r *Runner) run() error {
    for id, task := range r.tasks {if r.isInterrupt() {return ErrInterrupt}
        task(id)// 执行任务
    }
    return nil
}

// 检查是否接收到了中断信号
func (r *Runner) isInterrupt() bool {
    select {
    case <-r.interrupt:
        signal.Stop(r.interrupt)
        return true
    default:
        return false
    }
}

也就是,先判断有没有中断信号,能取出返回中断错误,没有,执行任务代码。这里判断是否接收中断信号isInterrupt 使用 select 语法,有就执行 case,没有就 default。

创建 start 方法,监控任务,也就是 run 方法的包装

// 开始执行所有任务,并且监视通道事件
func (r *Runner) Start() error {
    // 希望接收哪些系统信号
    signal.Notify(r.interrupt, os.Interrupt)

    go func() {r.complete <- r.run()
    }()

    select {
    case err := <-r.complete:
        return err
    case <-r.timeout:
        return ErrTimeOut
    }
}

这个是 runner 包最后一个方法了,它从操作系统接收中断信号,监控超时信号,一旦超时返回超时错误。一旦调用 run 方法过程中出现中断 信号,赋值给 r.complete 终止信号。只要此执行者 Runner 出现中止、正常完成、超时,就会退出。

完成的 runner 代码 如下:

package runner

import (
    "errors"
    "os"
    "os/signal"
    "time"
)

var ErrTimeOut = errors.New("执行者执行超时")
var ErrInterrupt = errors.New("执行者被中断")
/*
runner 可以执行任何程序,可以监控程序,可以发送信号终止程序
 */
type Runner struct {
    interrupt chan os.Signal   // 发送的信号,用来终止程序
    complete  chan error       // 用于通知任务全部完成
    timeout   <-chan time.Time // 程序的超时时间
    tasks     []func(int)      // 要执行的任务
}

// 工厂函数
func New(tm time.Duration) *Runner {
    return &Runner{complete:  make(chan error), // 无缓冲
        timeout:   time.After(tm),
        interrupt: make(chan os.Signal, 1), // 有缓冲
    }
}

// 将需要执行的任务,添加到 Runner 里
func (r *Runner) Add(tasks ...func(int)) {r.tasks = append(r.tasks, tasks...)
}

// 执行任务,执行的过程中接收到中断信号时,返回中断错误
// 如果任务全部执行完,还没有接收到中断信号,则返回 nil
func (r *Runner) run() error {
    for id, task := range r.tasks {if r.isInterrupt() {return ErrInterrupt}
        task(id)
    }
    return nil
}

// 检查是否接收到了中断信号
func (r *Runner) isInterrupt() bool {
    select {
    case <-r.interrupt:
        signal.Stop(r.interrupt)
        return true
    default:
        return false
    }
}
// 开始执行所有任务,并且监视通道事件
func (r *Runner) Start() error {
    // 希望接收哪些系统信号
    signal.Notify(r.interrupt, os.Interrupt)

    go func() {r.complete <- r.run()
    }()

    select {
    case err := <-r.complete:
        return err
    case <-r.timeout:
        return ErrTimeOut
    }
}

使用 runner 管理工作:

package main

import (
    "a_tour_of_go/runner" // 这个是写的 runner 所在包
    "log"
    "os"
    "time"
)

// 必须在 3 内完成,超时时间
const timeout = 3 * time.Second

func main() {log.Println("开始工作")
    r := runner.New(timeout)                        // 初始化 runner
    r.Add(createTask(), createTask(), createTask()) // 添加任务
    // 执行任务并处理结果
    if err := r.Start(); err != nil {
        switch err {
        case runner.ErrTimeOut:
            log.Println("超时错误")
            os.Exit(1)
        case runner.ErrInterrupt:
            log.Println("中断错误")
            os.Exit(2)
        }
    }
    log.Print("工作完成")
}

func createTask() func(int) {return func(id int) {log.Printf("正在执行任务 #%d", id)
        time.Sleep(time.Duration(id) * time.Second)
    }
}

上面我们在 main 函数中初始化并调用我们的 runner。初始化超时时间 3 秒,添加了 3 个任务,每个任务打印一句话,睡眠了几秒。然后调用 r.Start()函数执行任务。正常会返回 nil, 会打印工作完成。异常会打印我们的错误,调用 os.Exit()以错误码退出。比如我们第 3 个任务就是睡眠 3 秒,这个是超时的,所以就会出现;

2019/06/24 09:58:39 开始工作
2019/06/24 09:58:39 正在执行任务 #0 
2019/06/24 09:58:39 正在执行任务 #1 
2019/06/24 09:58:40 正在执行任务 #2 
2019/06/24 09:58:42 超时错误

如果我们,使用命令 go run main.go , 在执行过程中按下 Ctrl+C , 就会中断程序,如下:

2019/06/24 10:02:00 开始工作
2019/06/24 10:02:00 正在执行任务 #0
2019/06/24 10:02:00 正在执行任务 #1
2019/06/24 10:02:01 中断错误
exit status 2

现在我们改下超时时间为 4,试试:

2019/06/24 10:03:16 开始工作
2019/06/24 10:03:16 正在执行任务 #0 
2019/06/24 10:03:16 正在执行任务 #1 
2019/06/24 10:03:17 正在执行任务 #2 
2019/06/24 10:03:19 工作完成

那就正常了。

2、pool

现在使用有缓冲的通道实现一个资源池,这个资源池可以管理在任意多个 goroutine 之间共享的资源,这种在需要共享一组静态资源的情况下非常有用,比如 网络连接 数据库连接 等,我们在数据库操作的时候,比较常见的就是数据连接池,也可以基于我们实现的资源池来实现。

可以看出,资源池也是一种非常流畅性的模式,这种模式一般适用于在多个 goroutine 之间共享资源,每个 goroutine 可以从资源池里申请资源,使用完之后再放回资源池里,以便其他 goroutine 复用。

创建资源池结构体

// 一个安全的资源池,被管理的资源必须都实现 io.Close 接口
type Pool struct {
    m sync.Mutex
    res chan io.Closer
    factory func() (io.Closer,error)
    closed bool
}

这个结构体 Pool 有四个字段,其中 m 是一个互斥锁,这主要是用来保证在多个 goroutine 访问资源时,池内的值是安全的。

res字段是一个有缓冲的通道,用来保存共享的资源,这个通道的大小,在初始化 Pool 的时候就指定的。注意这个通道的类型是 io.Closer 接口,所以实现了这个 io.Closer 接口的类型都可以作为资源,交给我们的资源池管理。

factory这个是一个函数类型,它的作用就是当需要一个新的资源时,可以通过这个函数创建,也就是说它是生成新资源的,至于如何生成、生成什么资源,是由使用者决定的,所以这也是这个资源池灵活的设计的地方。

closed字段表示资源池是否被关闭,如果被关闭的话,再访问是会有错误的。

现在先这个资源池我们已经定义好了,也知道了每个字段的含义,下面就开时具体使用。刚刚我们说到关闭错误,那么我们就先定义一个资源池已经关闭的错误。

var ErrPoolClosed = errors.New("资源池已经关闭。")

非常简洁,当我们从资源池获取资源的时候,如果该资源池已经关闭,那么就会返回这个错误。单独定义它的目的,是和其他错误有一个区分,这样需要的时候,我们就可以从众多的 error 类型里区分出来这个ErrPoolClosed

下面我们就该为创建 Pool 专门定一个函数了,这个函数就是工厂函数,我们命名为New

创建工厂函数

// 创建一个资源池
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
    if size <= 0 {return nil, errors.New("size 的值太小了。")
    }
    return &Pool{
        factory: fn,
        res:     make(chan io.Closer, size),
    }, nil
}

这个函数创建一个资源池,它接收两个参数,一个 fn 是创建新资源的函数;还有一个 size 是指定资源池的大小。

这个函数里,做了 size 大小的判断,起码它不能小于或者等于 0,否则就会返回错误。如果参数正常,就会使用 size 创建一个有缓冲的通道,来保存资源,并且返回一个资源池的指针。

有了创建好的资源池,那么我们就可以从中获取资源了。

// 从资源池里获取一个资源
func (p *Pool) Acquire() (io.Closer,error) {
    select {
    case r,ok := <-p.res:
        log.Println("Acquire: 共享资源")
        if !ok {return nil,ErrPoolClosed}
        return r,nil
    default:
        log.Println("Acquire: 新生成资源")
        return p.factory()}
}

Acquire方法可以从资源池获取资源,如果没有资源,则调用 factory 方法生成一个并返回。

这里同样使用了 select 的多路复用,因为这个函数不能阻塞,可以获取到就获取,不能就生成一个。

这里的新知识是通道接收的多参返回,如果可以接收的话,第一参数是接收的值,第二个表示通道是否关闭。例子中如果 ok 值为 false 表示通道关闭,如果为 true 则表示通道正常。所以我们这里做了一个判断,如果通道关闭的话,返回通道关闭错误。

有获取资源的方法,必然还有对应的释放资源的方法,因为资源用完之后,要还给资源池,以便复用。在讲解释放资源的方法前,我们先看下关闭资源池的方法,因为释放资源的方法也会用到它。

关闭资源池,意味着整个资源池不能再被使用,然后关闭存放资源的通道,同时释放通道里的资源。

资源池关闭函数

// 关闭资源池,释放资源
func (p *Pool) Close() {p.m.Lock()
    defer p.m.Unlock()

    if p.closed {return}

    p.closed = true

    // 关闭通道,不让写入了
    close(p.res)

    // 关闭通道里的资源
    for r:=range p.res {r.Close()
    }
}

这个方法里,我们使用了互斥锁,因为有个标记资源池是否关闭的字段 closed 需要再多个 goroutine 操作,所以我们必须保证这个字段的同步。这里把关闭标志置为true

然后我们关闭通道,不让写入了,而且我们前面的 Acquire 也可以感知到通道已经关闭了。同比通道后,就开始释放通道中的资源,因为所有资源都实现了 io.Closer 接口,所以我们直接调用 Close 方法释放资源即可。

关闭方法有了,我们看看释放资源的方法如何实现。

资源释放函数

func (p *Pool) Release(r io.Closer){
    // 保证该操作和 Close 方法的操作是安全的
    p.m.Lock()
    defer p.m.Unlock()

    // 资源池都关闭了,就省这一个没有释放的资源了,释放即可
    if p.closed {r.Close()
        return
    }

    select {
    case p.res <- r:
        log.Println("资源释放到池子里了")
    default:
        log.Println("资源池满了,释放这个资源吧")
        r.Close()}
}

释放资源本质上就会把资源再发送到缓冲通道中,就是这么简单,不过为了更安全的实现这个方法,我们使用了互斥锁,保证 closed 标志的安全,而且这个互斥锁还有一个好处,就是不会往一个已经关闭的通道发送资源。

这是为什么呢?因为 Close 和 Release 这两个方法是互斥的,Close 方法里对 closed 标志的修改,Release 方法可以感知到,所以就直接 return 了,不会执行下面的 select 代码了,也就不会往一个已经关闭的通道里发送资源了。

如果资源池没有被关闭,则继续尝试往资源通道发送资源,如果可以发送,就等于资源又回到资源池里了;如果发送不了,说明资源池满了,该资源就无法重新回到资源池里,那么我们就把这个需要释放的资源关闭,抛弃了。

针对这个资源池管理的一步步都实现了,而且做了详细的讲解,下面就看下整个示例代码,方便理解。

完整代码

package pool

import (
    "errors"
    "io"
    "sync"
    "log"
)

// 一个安全的资源池,被管理的资源必须都实现 io.Close 接口
type Pool struct {
    m       sync.Mutex
    res     chan io.Closer
    factory func() (io.Closer, error)
    closed  bool
}

var ErrPoolClosed = errors.New("资源池已经被关闭。")

// 创建一个资源池
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
    if size <= 0 {return nil, errors.New("size 的值太小了。")
    }
    return &Pool{
        factory: fn,
        res:     make(chan io.Closer, size),
    }, nil
}
// 从资源池里获取一个资源
func (p *Pool) Acquire() (io.Closer,error) {
    select {
    case r,ok := <-p.res:
        log.Println("Acquire: 共享资源")
        if !ok {return nil,ErrPoolClosed}
        return r,nil
    default:
        log.Println("Acquire: 新生成资源")
        return p.factory()}
}

// 关闭资源池,释放资源
func (p *Pool) Close() {p.m.Lock()
    defer p.m.Unlock()

    if p.closed {return}

    p.closed = true

    // 关闭通道,不让写入了
    close(p.res)

    // 关闭通道里的资源
    for r:=range p.res {r.Close()
    }
}

func (p *Pool) Release(r io.Closer){
    // 保证该操作和 Close 方法的操作是安全的
    p.m.Lock()
    defer p.m.Unlock()

    // 资源池都关闭了,就省这一个没有释放的资源了,释放即可
    if p.closed {r.Close()
        return 
    }

    select {
    case p.res <- r:
        log.Println("资源释放到池子里了")
    default:
        log.Println("资源池满了,释放这个资源吧")
        r.Close()}
}

好了,资源池管理写好了,也知道资源池是如何实现的啦,现在我们看看如何使用这个资源池,模拟一个数据库连接池吧。

模拟数据库连接案例

package main

import (
    "flysnow.org/hello/common"
    "io"
    "log"
    "math/rand"
    "sync"
    "sync/atomic"
    "time"
)

const (
    // 模拟的最大 goroutine
    maxGoroutine = 5
    // 资源池的大小
    poolRes      = 2
)

func main() {
    // 等待任务完成
    var wg sync.WaitGroup
    wg.Add(maxGoroutine)

    p, err := common.New(createConnection, poolRes)
    if err != nil {log.Println(err)
        return
    }
    // 模拟好几个 goroutine 同时使用资源池查询数据
    for query := 0; query < maxGoroutine; query++ {go func(q int) {dbQuery(q, p)
            wg.Done()}(query)
    }

    wg.Wait()
    log.Println("开始关闭资源池")
    p.Close()}

// 模拟数据库查询
func dbQuery(query int, pool *common.Pool) {conn, err := pool.Acquire()
    if err != nil {log.Println(err)
        return
    }

    defer pool.Release(conn)

    // 模拟查询
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    log.Printf("第 %d 个查询,使用的是 ID 为 %d 的数据库连接", query, conn.(*dbConnection).ID)
}
// 数据库连接
type dbConnection struct {ID int32// 连接的标志}

// 实现 io.Closer 接口
func (db *dbConnection) Close() error {log.Println("关闭连接", db.ID)
    return nil
}

var idCounter int32

// 生成数据库连接的方法,以供资源池使用
func createConnection() (io.Closer, error) {
    // 并发安全,给数据库连接生成唯一标志
    id := atomic.AddInt32(&idCounter, 1)
    return &dbConnection{id}, nil
}

这时我们测试使用资源池的例子,首先定义了一个结构体 dbConnection,它只有一个字段,用来做唯一标记。然后dbConnection 实现了 io.Closer 接口,这样才可以使用我们的资源池。

createConnection函数对应的是资源池中的 factory 字段,用来创建数据库连接 dbConnection 的,同时为其赋予了一个唯一的标志。

接着我们就同时开了 5 个 goroutine,模拟并发的数据库查询dbQuery,查询方法里,先从资源池获取可用的数据库连接,用完后再释放。

这里我们会创建 5 个数据库连接,但是我们设置的资源池大小只有 2,所以再释放了 2 个连接后,后面的 3 个连接会因为资源池满了而释放不了,一会我们看下输出的打印信息就可以看到。

最后这个资源连接池使用完之后,我们要关闭资源池,使用资源池的 Close 方法即可。

2019/06/24 10:51:08 Acquire: 新生成资源
2019/06/24 10:51:08 Acquire: 新生成资源
2019/06/24 10:51:08 Acquire: 新生成资源
2019/06/24 10:51:08 Acquire: 新生成资源
2019/06/24 10:51:08 Acquire: 新生成资源
2019/06/24 10:51:08 第 3 个查询,使用的是 ID 为 4 的数据库连接
2019/06/24 10:51:08 资源释放到池子里了
2019/06/24 10:51:08 第 2 个查询,使用的是 ID 为 5 的数据库连接
2019/06/24 10:51:08 资源释放到池子里了
2019/06/24 10:51:08 第 4 个查询,使用的是 ID 为 1 的数据库连接
2019/06/24 10:51:08 资源池满了,释放这个资源吧
2019/06/24 10:51:08 关闭连接 1
2019/06/24 10:51:09 第 1 个查询,使用的是 ID 为 3 的数据库连接
2019/06/24 10:51:09 资源池满了,释放这个资源吧
2019/06/24 10:51:09 关闭连接 3
2019/06/24 10:51:09 第 0 个查询,使用的是 ID 为 2 的数据库连接
2019/06/24 10:51:09 资源池满了,释放这个资源吧
2019/06/24 10:51:09 关闭连接 2
2019/06/24 10:51:09 开始关闭资源池
2019/06/24 10:51:09 关闭连接 4
2019/06/24 10:51:09 关闭连接 5

可以看到,我们现在虽然看到了资源释放,但是没看到复用资源啊,因为我们执行 goroutine 太快了,在资源池还空的时候都生成了新的资源链接,现在我们修改下代码看:

// 模拟数据库查询
func dbQuery(query int, pool *pool.Pool) {

    // 由于并发太快了,我们现在让他们有些不同的延迟,错开来查看复用资源
    time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
    conn, err := pool.Acquire()
    if err != nil {log.Println(err)
        return
    }

    defer pool.Release(conn)
    // 模拟查询
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)

    log.Printf("第 %d 个查询,使用的是 ID 为 %d 的数据库连接", query, conn.(*dbConnection).ID)
}

我们添加一行代码,让所有的 goroutine 睡眠时间不同,并不是上来就获取资源链接,查看结果

2019/06/24 11:27:55 Acquire: 新生成资源
2019/06/24 11:27:55 Acquire: 新生成资源
2019/06/24 11:27:56 第 0 个查询,使用的是 ID 为 1 的数据库连接
2019/06/24 11:27:56 资源释放到池子里了
2019/06/24 11:27:56 第 4 个查询,使用的是 ID 为 2 的数据库连接
2019/06/24 11:27:56 资源释放到池子里了
2019/06/24 11:27:56 Acquire: 共享资源
2019/06/24 11:27:56 Acquire: 共享资源
2019/06/24 11:27:57 第 2 个查询,使用的是 ID 为 2 的数据库连接
2019/06/24 11:27:57 资源释放到池子里了
2019/06/24 11:27:57 第 1 个查询,使用的是 ID 为 1 的数据库连接
2019/06/24 11:27:57 资源释放到池子里了
2019/06/24 11:27:58 Acquire: 共享资源
2019/06/24 11:27:59 第 3 个查询,使用的是 ID 为 2 的数据库连接
2019/06/24 11:27:59 资源释放到池子里了
2019/06/24 11:27:59 开始关闭资源池
2019/06/24 11:27:59 关闭连接 1
2019/06/24 11:27:59 关闭连接 2

可以看到,我们只生成了 2 个自选,共享了 3 次,执行了 5 个 goroutine。

到这里,我们已经完成了一个资源池的管理,并且进行了使用测试。资源对象池的使用比较频繁,因为我们想把一些对象缓存起来,以便使用,这样就会比较高效,而且不会经常调用 GC。

为此 Go 1.6 开始为我们提供了原生的资源池管理,防止我们重复造轮子,这就是 sync.Pool,我们看下刚刚我们的例子,如果用sync.Pool 实现。

sync.Pool

package main

import (
    "log"
    "math/rand"
    "sync"
    "sync/atomic"
    "time"
)

const (
    // 模拟的最大 goroutine
    maxGoroutine = 5
)

func main() {
    // 等待任务完成
    var wg sync.WaitGroup
    wg.Add(maxGoroutine)

    p:=&sync.Pool{New:createConnection,}

    // 模拟好几个 goroutine 同时使用资源池查询数据
    for query := 0; query < maxGoroutine; query++ {go func(q int) {dbQuery(q, p)
            wg.Done()}(query)
    }

    wg.Wait()}

// 模拟数据库查询
func dbQuery(query int, pool *sync.Pool) {conn:=pool.Get().(*dbConnection)

    defer pool.Put(conn)

    // 模拟查询
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    log.Printf("第 %d 个查询,使用的是 ID 为 %d 的数据库连接", query, conn.ID)
}
// 数据库连接
type dbConnection struct {ID int32// 连接的标志}

// 实现 io.Closer 接口
func (db *dbConnection) Close() error {log.Println("关闭连接", db.ID)
    return nil
}

var idCounter int32

// 生成数据库连接的方法,以供资源池使用
func createConnection() interface{} {
    // 并发安全,给数据库连接生成唯一标志
    id := atomic.AddInt32(&idCounter, 1)
    return &dbConnection{ID:id}
}

进行微小的改变即可,因为系统库没有提供 New 这类的工厂函数,所以我们使用字面量创建了一个 sync.Pool,注意里面的New 字段,这是一个返回任意对象的方法,类似我们自己实现的资源池中的factory 字段,意思都是一样的,都是当没有可用资源的时候,生成一个。

这里我们留意到系统的资源池是没有大小限制的,也就是说默认情况下是无上限的,受内存大小限制。

资源的获取和释放对应的方法是 GetPut, 也很简洁,返回任意对象interface{}conn:=pool.Get().(*dbConnection) 这行代码用类型断言的方式,如果类型是*dbConnection, 就返回该类型,不是的话返回 nil, 这样写简介,不用写类型转换。

2019/06/24 11:49:30 第 2 个查询,使用的是 ID 为 4 的数据库连接
2019/06/24 11:49:30 第 0 个查询,使用的是 ID 为 1 的数据库连接
2019/06/24 11:49:30 第 3 个查询,使用的是 ID 为 5 的数据库连接
2019/06/24 11:49:31 第 1 个查询,使用的是 ID 为 2 的数据库连接
2019/06/24 11:49:31 第 4 个查询,使用的是 ID 为 3 的数据库连接

关于系统的资源池,我们需要注意的是它缓存的对象都是临时的,也就说下一次 GC 的时候,这些存放的对象都会被清除掉。

3、work

work 模式的目的是展示如何使用无缓冲的通道来创建一个 goroutine 池,这些 goroutine 执行
并控制一组工作,让其并发执行。在这种情况下,使用无缓冲的通道要比随意指定一个缓冲区大
小的有缓冲的通道好,因为这个情况下既不需要一个工作队列,也不需要一组 goroutine 配合执
行。无缓冲的通道保证两个 goroutine 之间的数据交换 。这种使用无缓冲的通道的方法允许使用
者知道什么时候 goroutine 池正在执行工作,而且如果池里的所有 goroutine 都忙,无法接受新的
工作的时候,也能及时通过通道来通知调用者。使用无缓冲的通道不会有工作在队列里丢失或者
卡住,所有工作都会被处理。

直接上代码:

package work

import "sync"

// 必须满足 Worker 类型才能使用工作池
type Worker interface {Task()
}
//goroutine 池,完成任何提交给 worker 的工作
type Pool struct {
    work chan Worker
    wg  sync.WaitGroup
}
// 创建一个新的工作池
func New(maxGoroutines int) *Pool{
    p:=Pool{work:make(chan Worker),
    }
    p.wg.Add(maxGoroutines)  // 添加多个 goroutine
    for i:=0;i<maxGoroutines;i++{ // 开启多个 gouroutine
        go func() {
            for w:=range p.work{ // 阻塞、等待这 channel 的值传来,只要我不关闭,就等着
                w.Task() // 执行工作代码}
            p.wg.Done()}()}
    return &p
}

//Run 提交工作到工作池
func (p *Pool)Run(w Worker){p.work<-w  // 传一个 Worker 类型的值给通道}
//Shutdown 等待所有 goroutine 停止工作
func  (p *Pool) Shutdown(){close(p.work)  // 关闭通道,那么等待的通道就会关闭,for range 下面的代码就执行,p.wg.Done()
    p.wg.Wait()  //maxGoroutines 几个,就等待几个 goroutine 关闭。}

work 模式其实就是通过构建多个 goroutine,通过 for range 阻塞等待的方式,事先准备好执行工作的代码。并且使用的是无缓冲通道,,使用的时候直接往通道传值,准备好的通道立马接收到值,马上执行工作代码。说到底其实就是 我一个工作,事先声明了多个 goroutine 等着做,只要我不关闭 channel,就一直等着,来一个我做一个,我是并发工作的。

整体结构如上面代码:

首先,我是一个工作管理者,我要定义我接收的类型,定义一个 Worker 接口,Task()方法。

然后,我要去多线程的去执行工作,创建一个 goroutine 池 结构,结构体成员一个是 channel,一个是 WaitGroup。我是通过 channel 通信来管理工作执行,waitgroup 保证每一个都能执行完。

再然后,我需要一个工厂函数,创建我这个工作管理者的 实例。思路如下:初始化的时候由用户告诉我需要开启多少个 goroutine 来并发的执行任务,每一个 goroutine 都是 在 p.wg.Add()p.wg.Done() 直接活着。比如我们传入 3 开启了 3 个 goroutine,这三个 goroutine 都是使用 for range 语法在等待着 p.work 这个 channel 有没有值,一直在等啊等,下面代码就不执行了,阻塞中。这就是工作原理,只要有人一传值,我立马就能工作,只要这个 channel 没有关闭,还会一直等。

再然后就是,我需要给别人提供添加任务 (也就是发送信息) 的入口,Run() 方法,直接往 p.work 中传入 Worker 类型的值即可。

最后,所有工作完成后是要收尾的 Shutdown(),首先需要关闭p.work 这个通道,只有关闭了通道,那么开启的 3 个 goroutine 就会不再阻塞,就会执行后面的 p.wg.Done() 方法,然后还有一行代码是 p.wg.Wait() 这个就是确保等待着 3 个 goroutine 都关闭即他们都执行 p.wg.Done() 这行代码,避免有些没有 done,主 goroutine 就退出了。

1)

测试代码

package main

import (
    "a_tour_of_go/work"
    "log"
    "sync"
    "time"
)

// 测试一组名字
var names=[]string{
    "张飞",
    "吕布",
    "关羽",
    "赵云",
    "二蛋",
}
// 定义个结构体,并实现 Worker 接口
type  namePrinter struct {name  string}
// 实现 Task,即实现了 Worker 接口
func  (m *namePrinter) Task(){log.Println(m.name)
    time.Sleep(time.Second)
}

func  main(){
    // 使用 3 个 goroutine 的工作池
    p:=work.New(3)
    var wg sync.WaitGroup
    wg.Add(100*len(names)) // 即每打算每个 names 的成员都开 100goroutine
    for i:=0;i<100 ;i++  {
        for _,name:=range names{
            np:=namePrinter{name:name,}
            go func() {p.Run(&np) // 提交工作
                wg.Done() // 结束提交工作的 goroutine}()}
    }
    wg.Wait() // 等着所有提交 goroutine 执行完毕
    p.Shutdown()// 关闭工作池}

测试代码的思路是:每一个名字,开启了 100 个 goroutine 去打印,一共有 500 个 goroutine。这些 goroutine 不是用来执行工作的,他们其实都只是提交工作命令的,也就是传递信息的。每一个 goroutine 往 channel 中传个值就完成,开销又不大。因为我们的工作池上面准备好工作了,就等着命令了,这边信息只要已传递,工作代码就执行,并且是开启了 3 个 goroutine 等待着,一次性提交 500 个命令,那么就会 3 个一组 3 个一组的打印,并发执行。等到所有提交 goroutine 都执行完成了,wg.Wait()的使命就结束了,接下来就是要关闭工作池了。Shutdown 方法会直到所有的工作都昨晚才返回,因为所有的提交命令 传递的同时,那边工作的 goroutine 已经在执行工作代码了 (w.Task() ),只要 工作代码(w.Task()) 不执行完,下面 p.wg.Done() 关闭工作 goroutine 的代码就不会执行。

这个例子最多会等待 3 个工作完成,因为只开启了 3 个工作 goroutine, 一旦都 Done()了,那就主函数退出了。

参考资料

  • go 语言实战
  • 飞雪无情的博客

正文完
 0