KDP(数据服务平台)是一款由 KaiwuDB 独立自主研发的数据服务产品,以 KaiwuDB 为外围,面向 AIoT 场景打造的一站式数据服务平台,满足工业物联网、数字能源、车联网、智慧产业等行业外围业务场景下数据采集、解决、计算、剖析、利用的综合业务需要,实现“业务即数据,数据即服务”,助力企业从数据中开掘更大的商业价值。
在开发数据服务平台的实时计算组件时,可能会遇到这样的问题:实时计算组件向用户提供自定义规定性能,用户注册多个规定运行一段时间后,再批改规定的定义并重新启动,会产生协程泄露。
一、实在案例
本文将用伪代码全面介绍开发数据服务平台的实时计算组件过程中,可能遇到的协程泄露问题。
// 规定的大抵数据结构
type DummyRule struct{
BaseRule
sorce []Source
sink []Sink
//flow map key:flow 名称,value:flow 实例
flow map[string]Flow
...
}
上述 DummyRule 是本次示例的规定数据结构,它蕴含多个数据起源 Source,多个数据指标 Sink,及数据流 Flow。规定具体过程如下图:
1 和 2 是两个源,首先别离用加法解决 1 和 2 两个源;其次调用 Merge 操作合成一个流;接着进行 Fanout 操作,生成两个雷同的流,别离流入 7 和 8;最终通过 7 和 8 的数字类型转成字符串,别离写入到 out1.txt 和 out2.txt 文件中。
type Source struct{consumers []file.reader
out chan interface{}
ctx context.Context
cancel context.CancelFunc
...
}
上图是 Source 类数据源的伪代码,consumers 是用来读取文件数据的读取器,out 是用来传递给下一个数据流的通道,ctx 是 Go 的上下文。consumers 读取文件数据是一个独自的协程,读取的数据将放入 out 中,期待下一个数据流的生产。
type Sink struct{producers []file.writer
in chan interface{}
ctx context.Context
cancel context.CancelFunc
...
}
上图是 Sink 类数据指标的伪代码,producers 是用来写文件的写入器,in 是用来承受上一个数据流的通道,ctx 是 Go 的上下文,producers 写文件数据也是一个独自的协程。
func(fm FlatMap) Via(flow streams.Flow) streams.Flow{go fm.transmit(flow)
return flow
}
上图是数据流传递的源码。FlatMap 的用法是 curFlow := prevFlow.Via(nextFlow),这样能够把前一个 Flow 传递给下一个 Flow,能够看到一次数据流传递过程是在一个协程中进行的。
从后面源码可知,这个示例规定至多存在 10 个协程,但实际上,要比 10 个协程多得多。可见,在数据服务平台的实时计算组件中,协程治理是十分复杂的。
应用 go pprof,top,go traces 等工具进行重复测试和排查后,咱们才发现协程泄露是因为规定中 Sink 的 Context 未被正确勾销导致。
Context 是治理 goroutine 重要的语言个性。学会正确应用 Context,能够更好地厘清 goroutine 的关系并加以治理。从上述实例能够看出 Context 的重要性,学会正确应用 Context,不仅能够进步代码品质,更能够防止大量的协程泄露排查工作。
二、走进 Context
1. 介绍
Context 通常被称为上下文,在 Go 语言中,了解为 goroutine 的运行状态、现场,存在上上层 goroutine Context 的传递,下层 goroutine 会把 Context 传递给上层 goroutine。
每个 goroutine 在运行前,都要当时晓得程序以后的执行状态,通常将这些状态封装在一个 Context 变量中,传递给要执行的 goroutine。
在网络编程中,当接管到一个网络申请 Request,解决 Request 时,可能会在多个 goroutine 中解决。而这些 goroutine 可能须要共享 Request 的一些信息,当 Request 被勾销或者超时,所有从这个 Request 创立的 goroutine 也要被完结。
Go Context 包不仅实现了在程序单元之间共享状态变量的办法,同时能通过简略的办法,在被调用程序单元内部,通过设置 ctx 变量的值,将过期或撤销等信号传递给被调用的程序单元。
在网络编程中,如果存在 A 调用 B 的 API,B 调用 C 的 API,那么如果 A 调用 B 勾销,那么 B 调用 C 也应该被勾销。通过 Context 包,能够十分不便地在申请 goroutine 之间传递申请数据、勾销信号和超时信息。
Context 包的外围时 Context 接口:
// A Context carries a deadline, a cancellation signal, and other values across
// API boundaries
//
// Context's methods may be called by multiple goroutines simultaneously.
type Context interface{
// 返回一个超时工夫,到期则勾销 context。在代码中,能够通过 deadline 为 io 操作设置超过工夫
Deadline() (deadline time.Time, ok bool)
// 返回一个 channel,用于接管 context 的勾销或者 deadline 信号。// 当 channel 敞开,监听 done 信号的函数会立刻放弃以后正在执行的操作并返回。// 如果 context 实例时不可能勾销的,那么
// 返回 nil,比方空 context,valueCtx
Done()}
2. 应用办法
对于 goroutine,他们的创立和调用关系总是像层层调用进行的,就像一个树状构造,而更靠顶部的 Context 应该有方法被动敞开上司的 goroutine 的执行。为了实现这种关系,Context 也是一个树状构造,叶子节点总是由根节点衍生进去的。
要创立 Context 树,第一步应该失去根节点,Context.Backupgroup 函数的返回值就是根节点。
func Background() Context{return background}
该函数返回空的 Context,该 Context 个别由接管申请的第一个 goroutine 创立,是与进入申请对应的 Context 根节点,他不能被勾销,也没有值,也没有过期工夫。他经常作为解决 Request 的顶层的 Context 存在。
有了根节点,就能够创立子孙节点了,Context 包提供了一系列办法来创立他们:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {}
func WithDeadline(parent Context, d time.Time)(Context, CancelFunc) {}
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {}
func WithValue(parent Context, key, val interface{}) Context {}
函数都接管一个 Context 类型的 parent,并返回一个 Context 类型的值,这样就层层创立除不同的 Context,子节点是从复制父节点失去,并且依据接管参数设定子节点的一些状态值,接着就能够将子节点传递给上层的 goroutine 了。
怎么通过 Context 传递扭转后的状态呢?
在父 goroutine 中能够通过 Withxx 办法获取一个 cancel 办法,从而取得了操作子 Context 的势力。
(1)WithCancel
WithCancel 函数,是将父节点复制到子节点,并且返回一个额定的 CancelFunc 函数类型变量,该函数类型的定义为:type CancelFunc func()
调用 CancelFunc 将撤销对应的子 Context 对象。在父 goroutine 中,通过 WithCancel 能够创立子节点的 Context,还取得了子 goroutine 的控制权,一旦执行了 CancelFunc 函数,子节点 Context 就完结了,子节点须要如下代码来判断是否曾经完结,并退出 goroutine:
select {case <- ctx.Cone():
fmt.Println("do some clean work ......")
}
(2)WithDeadline
WithDeadline 函数作用和 WithCancel 差不多,也是将父节点复制到子节点,然而其过期工夫是由 deadline 和 parent 的过期工夫独特决定。当 parent 的过期工夫早于 deadline 时,返回的过期工夫与 parent 的过期工夫雷同。父节点过期时,所有的子孙节点必须同时敞开。
(3)WithTimeout
WithTimeout 函数和 WithDeadline 相似,只不过,他传入的是从当初开始 Context 残余的生命时长。他们都同样也都返回了所创立的子 Context 的控制权,一个 CancelFunc 类型的函数变量。
当顶层的 Request 申请函数完结时,咱们能够 cancel 掉某个 Context,而子孙的 goroutine 依据 select ctx.Done() 来判断完结。
(4)Withvalue
WithValue 函数,返回 parent 的一个正本,调用该正本的 Value(key) 办法将失去 value。这样,咱们不仅将根节点原有的值保留了,还在子孙节点中退出了新的值;留神如果存在 key 雷同,则会笼罩。
3. 例子
package main
import (
"context"
"fmt"
"time"
)
func main() {ctxWithCancel, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
go worker(ctxWithCancel, "[1]")
go worker(ctxWithCancel, "[2]")
go manager(cancel)
<-ctxWithCancel.Done()
// 暂停 1 秒便于协程的打印输出
time.Sleep(1 * time.Second)
fmt.Println("example closed")
}
func manager(cancel func()) {time.Sleep(10 * time.Second)
fmt.Println("manager called cancel()")
cancel()}
func worker(ctxWithCancle context.Context, name string) {
for {
select {case <- ctxWithCancel.Done():
fmt.Println(name, "return for ctxWithCancel.Done()")
return
default:
fmt.Println(name, "working")
}
time.Sleep(1 * time.Second)
}
}
这个过程的 Context 的架构图:
[1]working
[2]working
[2]working
[1]working
[1]working
[2]working
[2]working
[1]working
[1]working
[2]working
[1]return for ctxWithCancel.Done()
[2]return for ctxWithCancel.Done()example closed
可见,这次 worker 完结是因为 ctxWithCancel 的计时器到点引起的。
把 manager 时长改成 2 秒,WithTimeout 时长不变,再运行一次,worker 只工作了 2 秒就被 manager 提前叫停了。
[1]working
[2]working
[2]working
[1]workingmanager called cancel()
[1]return for ctxWithCancel.Done()
[2]return for ctxWithCancel.Done()example closed