共计 5873 个字符,预计需要花费 15 分钟才能阅读完成。
学习对象:https://github.com/tmrts/go-p…。这个 repo 使用 go 语言实现了一些设计模式,包括常用的 Builder 模式,Singleton 模式等,也有列举出还未用 go 实现的模式,如 Bridge 模式等。
本文并非完整地介绍和解析这个 repo 里的每一行代码,只对个人认为值得学习和记录的地方进行说明,阅读过 repo 代码后再阅读本文比较合适。
Functional Options
这个模式是一种优雅地设置对象初始化参数的方式。考虑的点是:
如何友好地扩展初始化的选填参数
如何友好地处理默认值问题
函数签名见名知意
比较以下几种初始化对象参数的方法:
//name 是必填参数, timeout 和 maxConn 是选填参数,如果不填则设置为默认值
// pattern #1
func NewServer(name string, timeout time.Duration, maxConn uint) (*Server, error) {…}
// 这种方法最直观, 但也是最不合适的, 因为对于扩展参数需要修改函数签名, 且默认值需要通过文档获知
// pattern #2
type ServerConf struct {
Timeout time.Duration
MaxConn uint
}
func NewServer(name string, conf ServerConf) (*Server, error) {…} // 1)
func NewServer(name string, conf *ServerConf) (*Server, error) {…} // 2)
func NewServer(name string, conf …ServerConf) (*Server, error) {…} // 3)
// 改进: 使用了参数结构体, 增加参数不需要修改函数签名
// 1) conf 现在是必传, 实际上里面的是选填参数
// 2) 避免 nil; conf 可能在外部被改变.
// 3) 都使用默认值的时候可以不传, 但多个 conf 可能在配置上有冲突
// conf 的默认空值对于 Server 可能是有意义的.
// pattern #3: Functional Options
type ConfSetter func(srv *Server) error
func ServerTimeoutSetter(t time.Duration) ConfSetter {
return func(srv *Server) error {
srv.timeout = t
return nil
}
}
func ServerMaxConnSetter(m uint) ConfSetter {
return func(srv *Server) error {
srv.maxConn = m
return nil
}
}
func NewServer(name string, setter …ConfSetter) (*Server, error) {
srv := new(Server)
…
for _, s := range setter {
err := s(srv)
}
…
}
// srv, err := NewServer(“name”, ServerTimeoutSetter(time.Second))
// 使用闭包作为配置参数. 如果不需要配置选填参数, 只需要填参数 name.
上面的 pattern#2 尝试了三种方法来优化初始化参数的问题,但每种方法都有自己的不足之处。pattern#3,也就是 Functional Options,通过使用闭包来做优化,从使用者的角度来看,已经是足够简洁和明确了。当然,代价是初次理解这种写法有点绕,不如前两种写法来得直白。trade off
欲言又止稍加思考,容易提出这个问题:这跟 Builder 模式有什么区别呢?个人认为,Functional Options 模式本质上就是 Builder 模式:通过函数来设置参数。
参考文章:Functional options for friendly APIs
Circuit-Breaker
熔断模式:如果服务在一段时间内不可用,这时候服务要考虑主动拒绝请求(减轻服务方压力和请求方的资源占用)。等待一段时间后(尝试等待服务变为可用),服务尝试接收部分请求(一下子涌入过多请求可能导致服务再次不可用),如果请求都成功了,再正常接收所有请求。
// 极其精简的版本, repo 中版本详尽一些
type Circuit func() error
// Counter 的实现应该是一个状态机
type Counter interface {
OverFailureThreshold()
UpdateFailure()
UpdateSuccess()
}
var cnt Counter
func Breaker(c Circuit) Circuit {
return func() {
if cnt.OverFailureThreshold() {
return fmt.Errorf(“ 主动拒绝 ”)
}
if err := c(); err != nil {
cnt.UpdateFailure()
return err
}
cnt.UpdateSuccess()
return nil
}
}
熔断模式更像是中间件而不是设计模式:熔断器是一个抽象的概念而不是具体的代码实现;另外,如果要实现一个实际可用的熔断器,要考虑的方面还是比较多的。举些例子:需要提供手动配置熔断器的接口,避免出现不可控的请求情况;什么类型的错误熔断器才生效(恶意发送大量无效的请求可能导致熔断器生效),等等。
参考文章:Circuit Breaker pattern 参考实现:gobreaker
Semaphore
go 的标准库中没有实现信号量,repo 实现了一个:)repo 实现的实质是使用 chan。chan 本身已经具备互斥访问的功能,而且可以设定缓冲大小,只要稍加修改就可以当作信号量使用。另外,利用 select 语法,可以很方便地实现超时的功能。
type Semaphore struct {
resource chan struct{} // 编译器会优化 struct{} 类型, 使得所有 struct{} 变量都指向同一个内存地址
timeout time.Duration // 用于避免长时间的死锁
}
type TimeoutError error
func (s *Semaphore) Aquire() TimeoutError {
select {
// 会从上到下检查是否阻塞
// 如果 timeout 为 0, 且暂时不能获得 / 解锁资源, 会立即返回超时错误
case: <-s.resource:
return nil
case: <- time.After(s.timeout):
return fmt.Errorf(“timeout”)
}
}
func (s *Semaphore) Release() TimeoutError {
select {
// 同 Aquire()
case: s.resource <- struct{}{}:
return nil
case: <- time.After(s.timeout):
return fmt.Errorf(“timeout”)
}
}
func NewSemaphore(num uint, timeout time.Duration) (*Semaphore, error) {
if num == 0 {
return fmt.Errorf(“invalid num”) // 如果是 0, 需要先 Release 才能 Aquire.
}
return &Semaphore{
resource: make(chan strcut{}, num),
timeout: timeout,
}, nil // 其实返回值类型也不影响 Semaphore 正常工作, 因为 chan 是引用类型
}
Object Pool
标准库的 sync 包已经有实现了一个对象池,但是这个对象池接收的类型是 interface{}(万恶的范型),而且池里的对象如果不被其它内存引用,会被 gc 回收(同 java 中弱引用的 collection 类型类似)。repo 实现的对象池是明确类型的 ( 万恶的范型 +1),而且闲置不会被 gc 回收。但仅仅作为展示说明,repo 的实现没有做超时处理。下面的代码尝试加上超时处理。也许对使用者来说,额外增加处理超时错误的代码比较繁琐,但这是有必要的,除非使用者通读并理解了你的代码。trade off
type Pool struct {
pool chan *Object
timeout time.Duration
}
type TimeoutError error
func NewPool(total int, timeout time.Duration) *Pool {
p := &Pool {
pool: make(Pool, total),
timeout: timeout,
} //pool 是引用类型, 所以返回类型可以不是指针
for i := 0; i < total; i++ {
p.pool <- new(Object)
}
return p
}
func (p *Pool) Aquire() (*Object, TimeoutError) {
select {
case obj <- p.pool:
return obj, nil
case <- time.After(timeout):
return nil, fmt.Errorf(“timeout”)
}
}
func (p *Pool) Release(obj *Object) TimeoutError {
select {
case p.pool <- obj:
return nil
case <- time.After(timeout):
return nil, fmt.Errorf(“timeout”)
}
}
chan and goroutine
解析一下 repo 里 goroutine 和 chan 的使用方式,也不算是设计模式。
Fan-in pattern 主要体现如何使用 sync.WaitGroup 同步多个 goroutine。思考:这里的实现是如果 cs 的长度为 n, 那个要开 n 个 goroutine, 有没有办法优化为开常数个 goroutine?
// 将若干个 chan 的内容合并到一个 chan 当中
func Merge(cs …<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(len(cs))
// 将 send 函数在 for 循环中写成一个不带参数的匿名函数, 看起来会使代码更简洁,
// 但实际上所有 for 循环里的所有 goroutine 会公用一个 c, 代码不能正确实现功能.
send := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
for _, c := range cs {
go send(c)
}
// 开一个 goroutine 等待 wg, 然后关闭 merge 的 chan, 不阻塞 Merge 函数
go func() {
wg.Wait()
close(out)
}
return out
}
Fan-out pattern 将一个主 chan 的元素循环分发给若干个子 chan(分流)。思路比较简单就不贴代码了。思考:reop 实现的代码,如果其中一个子 chan 没有消费元素,那么整个分发流程都会卡住。是否可以优化?
Bounded Parallelism Pattern 比较完成的例子来说明如何时候 goroutine. 并发计算目录下文件的 md5.
func MD5All(root string) (map[string][md5.Size]byte, error) {// 因为 byte 是定长的, 使用数据更合适, 可读且性能也好一点
done := make(chan struct{}) // 用于控制整个流程是否暂停. 其实这里是用 context 可能会更好.
defer close(done)
paths, errc := walkFiles(done, root)
c := make(chan result)
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c)
wg.Done()
}()
}
// 同上, 开 goroutine 等待所有 digester 结束
go func() {
wg.Wait()
close(c)
}()
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// 必须放在 m 处理结束后才检查 errc. 否则, 要等待 walkFiles 结束了才能开始处理 m
// 相反, 如果 errc 有信号, c 肯定已经 close 了
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
paths := make(chan string) // 这里可以适当增加缓冲, 取决于 walkFiles 快还是 md5.Sum 快
errc := make(chan error, 1) // 必须有缓冲, 否则死锁. 上面的代码 paths close 了才检查 errc
go func() {
defer close(paths) // 这里的 defer 不必要. defer 是运行时的, 有成本.
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-done:
return errors.New(“walk canceled”)
}
return nil
})
}()
return paths, errc
}
type result struct {
path string
sum [md5.Size]byte
err error
}
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
// 看 md5.Sum 先结束还是 done 信号先到来
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}