1. 简介
本文介绍了在并发编程中数据汇总的问题,并探讨了在并发环境下应用互斥锁和通道两种形式来保证数据安全性的办法。
首先,通过一个实例,形容了一个并发拉取数据并汇总的案例,并应用互斥锁来确保线程平安。而后,探讨了互斥锁的一些毛病,引出了通道作为一种代替计划,并介绍了通道的根本应用和个性。接下来,通过实例演示了如何应用通道来实现并发下的数据汇总。
最初,援用了 etcd 中应用通道实现协程并发下数据汇总的例子,展现了通道在理论我的项目中的利用。
2. 问题引入
在申请处理过程中,常常须要通过 RPC 接口拉取数据。有时候,因为数据量较大,单个数据拉取操作可能会导致整个申请的解决工夫较长。为了放慢处理速度,咱们通常思考同时开启多个协程并发地拉取数据。一旦多个协程并发拉取数据后,主协程须要汇总这些协程拉取到的数据,而后再返回后果。在这个过程中,往往波及对共享资源的并发拜访,为了保障线程安全性,通常会应用互斥锁。上面通过一个简略的代码来展现该过程:
package main
import (
"fmt"
"sync"
"time"
)
type Data struct {
ID int
Name string
}
var (
// 汇总后果
dataList []Data
// 互斥锁
mutex sync.Mutex
)
func fetchData(page int, wg *sync.WaitGroup) {
// 模仿 RPC 接口拉取数据的耗时操作
time.Sleep(time.Second)
// 假如从 RPC 接口获取到了一批数据
data := Data{
ID: page,
Name: fmt.Sprintf("Data %d", page),
}
// 应用互斥锁爱护共享数据的并发拜访
mutex.Lock()
defer mutext.Unlock()
dataList = append(dataList, data)
wg.Done()}
func main() {
var wg sync.WaitGroup
// 定义须要拉取的数据页数
numPages := 10
// 启动多个协程并发地拉取数据
for i := 1; i <= numPages; i++ {wg.Add(1)
go fetchData(i, &wg)
}
// 期待所有协程实现
wg.Wait()
// 打印拉取到的数据
fmt.Println("Fetched data:")
for _, data := range dataList {fmt.Printf("ID: %d, Name: %s\n", data.ID, data.Name)
}
}
在上述示例中,咱们定义了一个共享的 dataList
切片用于保留拉取到的数据。每个 goroutine 通过调用 fetchData
函数来模仿拉取数据的过程,并应用互斥锁 mutex
爱护 dataList
的并发拜访。主协程应用 sync.WaitGroup
期待所有协程实现数据拉取工作,而后打印出拉取到的数据。通过并发地拉取数据,并应用互斥锁保障线程平安,咱们能够显著进步数据拉取的速度,并且确保数据的正确性和一致性。
回看上述实现,其实是波及到了多个协程操作同一份数据,有可能导致线程平安的问题,而后这里是通过互斥锁来保障线程平安的。的确,应用互斥锁是能够保障线程平安的,然而也是存在一些毛病的,比方竞争和阻塞,两个协程同时竞争互斥锁时,只有一个协程可能取得锁,而其余协程则会被阻塞,这个就可能导致性能瓶颈,当然在这个场景下问题不大。其次就是代码的复杂性进步了,应用互斥锁须要认真设计和治理,确保锁的正确获取和开释。这减少了代码的复杂性和保护老本,如果在代码中解决锁的形式不正确,可能会死锁,导致程序无奈继续执行。
那咱们其实就有疑难,在协程并发下数据汇总的场景,是否存在其余形式,不须要通过应用互斥锁,也可能保障线程平安呢? 其实还真有,Go
语言中的 channel
十分实用于这种状况。通过应用通道,咱们能够实现线程平安的数据共享和同步,而无需显式地应用互斥锁。上面咱们来理解一下channel
。
3. channel 的应用
3.1 channel 的根本介绍
3.1.1 根本阐明
channel
在 Go 语言中是一种非凡的数据结构,用于协程之间的通信和同步。它相似于一个先进先出 (FIFO) 的队列,用于数据的传输和共享。在并发环境中,能够将数据发送到通道,也能够从通道中接收数据,而这两个操作都是线程平安的。
应用 channel
的劣势在于它提供了内置的同步机制,无需显式地应用互斥锁来解决并发拜访。
当一个协程向通道发送数据时,如果通道已满,发送操作会被阻塞,直到有其余协程从通道中接收数据开释空间。同样地,当一个协程从通道接收数据时,如果通道为空,接管操作也会被阻塞,直到有其余协程向通道发送数据。
同时,当多个协程同时拜访通道时,Go 运行时零碎会主动解决协程之间的同步和并发拜访的细节,保证数据的正确性和一致性。从而能够释怀地在多个协程中应用通道进行数据的发送和接管操作,而不须要额定的锁或同步机制来保障线程平安。
因而,应用 channel
其实是能够防止常见的并发问题,如竞态条件和死锁,简化了并发编程的复杂性。
3.1.2 根本应用
通过上面对 channel
的根本介绍,咱们曾经对 channel
有了根本的理解,其实能够粗略了解其为一个并发平安的队列。上面来理解下 channel
的根本语法,从而可能开始应用channel
。
channel 基本操作分为创立 channel
, 发送数据到channel
, 接管channel
中的数据,以及敞开channel
。上面对其进行简略展现:
创立 channel,应用 make 函数创立通道,通道的类型能够依据须要抉择,例如 int
、string
等:
ch := make(chan int)
发送数据到 channel:应用 <-
操作符将数据发送到通道中
ch <- data
接管 channel 中的数据: 应用 <-
操作符从通道中接收数据
result := <-ch
敞开 channel,应用 close
函数敞开通道。敞开通道后,依然能够从通道接收数据,但无奈再向通道发送数据
close(ch)
通过下面 channel
的四个基本操作,便可能实现在不同协程间线程平安得传递数据。最初通过一个例子,残缺得展现 channel
的根本应用。
package main
import "fmt"
func main() {ch := make(chan string) // 创立字符串通道
defer close(ch)
go func() {ch <- "hello, channel!" // 发送数据到通道}()
result := <-ch // 从通道接收数据
fmt.Println(result)
}
在这个示例中,咱们创立了一个字符串通道ch
。而后,在一个独自的协程中,咱们向通道发送了字符串 ”hello, channel!”。最初,主协程从通道中接收数据,并将其打印进去。
通过应用通道,咱们能够实现协程之间的数据传输和同步,确保数据的平安共享和线程安全性。通道的应用可能简化并发编程的复杂性,提供一种高效、牢靠的形式来解决并发场景下的数据传递。
3.2 应用 channel 实现汇总数据
上面,咱们应用 channel
来实现并发数据汇总,替换掉之前应用互斥锁来保障线程平安的实现:
package main
import (
"fmt"
"sync"
"time"
)
type Data struct {
ID int
Name string
}
func fetchData(page int, ch chan Data, wg *sync.WaitGroup) {
// 模仿 RPC 接口拉取数据的耗时操作
time.Sleep(time.Second)
// 假如从 RPC 接口获取到了一批数据
data := Data{
ID: page,
Name: fmt.Sprintf("Data %d", page),
}
ch <- data // 将数据发送到通道
wg.Done()}
func main() {
var wg sync.WaitGroup
// 定义须要拉取的数据页数
numPages := 10
dataCh := make(chan Data, 10) // 创立用于接收数据的通道
// 启动多个协程并发地拉取数据
for i := 1; i <= numPages; i++ {wg.Add(1)
go fetchData(i, dataCh, &wg)
}
go func() {wg.Wait()
close(dataCh) // 敞开通道,示意数据曾经全副发送实现
}()
// 从通道接收数据并汇总
var dataList []Data
for data := range dataCh {dataList = append(dataList, data)
}
// 打印拉取到的数据
fmt.Println("Fetched data:")
for _, data := range dataList {fmt.Printf("ID: %d, Name: %s\n", data.ID, data.Name)
}
}
在批改后的代码中,咱们创立了一个用于接收数据的 dataCh
。每个协程通过将数据发送到该 channel
来实现数据的汇总。主协程通过从channel
接收数据,并将其增加到 dataList
中实现数据的汇总过程。这种形式不须要显式地加锁和解锁,并且防止了互斥锁带来的复杂性和性能问题。
通过应用 channel
,咱们可能以一种更直观、更平安的形式实现协程之间的数据传递和同步。channel
在并发编程中起到了要害的作用,简化了并发操作的治理和实现。同时,它提供了内置的同步机制,保障了数据的正确性和一致性,防止了死锁和竞态条件的问题。
3.3 总结
协程间的并发下汇总数据能够归类为协程间的数据传递这个场景。在这个场景中,多个协程并发地拉取数据,而后将数据汇总到一个共享的数据结构中。为了保证数据的正确性和一致性,须要应用某种机制来确保多个协程对共享数据的并发拜访是平安的。
在原始的实现中,应用了互斥锁来爱护共享数据的并发拜访。互斥锁提供了互斥拜访的机制,确保同一时间只有一个协程能够访问共享数据,从而防止了数据竞争和不一致性。这种形式在保障线程平安的同时,引入了锁的开销和复杂性。
而应用 channel
来实现协程间的平安数据传递能够更简洁和高效。每个协程能够将拉取到的数据通过 channel
发送到主协程,主协程通过接管 channel
中的数据来进行汇总。channel
提供了并发平安的数据传递机制,协程之间的数据传输是同步和有序的。因为 channel
自身就提供了同步机制,不须要额定的锁和同步操作,可能更简洁地实现协程间的平安数据传递。
因而,如果须要在多个协程间实现数据传递,而且由此可能带来线程平安的问题,此时应用 channel
来实现是绝对比拟适合的。
4. 开源我的项目中的应用
假如咱们须要对 etcd
进行性能测试,此时须要模仿大量并发申请,对 etcd
进行负载测试,并收集每个申请的执行工夫、胜利 / 失败状态等后果数据。而后主协程须要收集每一个申请的后果数据,并进行统计计算,生成相应的性能报告。基于此,可能计算出总申请数、申请成功率、均匀执行工夫、最慢 / 最快申请等统计信息,以及谬误散布状况和慢速申请的详细信息。
从下面的讲述来看,其实咱们能够大略设想出这个模型,多个协程并发执行,而后获取每个申请的后果数据。而后主协程须要收集汇总这些数据,基于此来生成性能报告。这个模型其实也就是咱们下面所说的协程并发下的数据汇总,因而通过 channel
来实现协程间的数据传输,是十分适合的。
上面咱们来看看 etcd
中对应的实现。etcd
中存在一个 report
对象的实现,可能承受一系列的申请数据的后果,而后生成性能报告返回回去。构造体定义如下:
type report struct {
results chan Result
stats Stats
}
func (r *report) Results() chan<- Result { return r.results}
// Result describes the timings for an operation.
type Result struct {
Start time.Time
End time.Time
Err error
}
func newReport(precision string) *report {
r := &report{results: make(chan Result, 16),
}
return r
}
Result
构造体为单个测试的后果,而 report
构造体则用于整个测试过程的报告和统计信息。通过应用 results
通道,能够将每个测试的后果发送到 report
构造体中,以便进行统计和生成报告。
当进行性能压测时,首先通过 newReport
生成一个 report
对象,而后启动多个协程同时进行压测申请,每一个申请解决实现之后,便会生成一个处理结果,存储到 Result
对象当中。而后基于 report
对象的 Results
办法获取到对应的channel
,将处理结果传输给主协程。
主协程便通过遍历 report
对象中的 results
变量对应的channel
,汇总计算所有处理结果,基于此便可能生成压测后果和报告。上面来看其具体流程。
首先是创立一个 report
对象,而后启动多个协程来解决申请,将后果发送到 report
对象中的 results
对应的 channel
中。
// 这里 NewReportSample 办法, 其实是对下面 newReport 办法的一个封装
r := NewReportSample("%f")
// 这里假如只有一个协程,模仿执行一系列的测试,并将测试后果发送到 Report 对象的 results 通道中。go func() {start := time.Now()
for i := 0; i < 5; i++ {
// 不实在进行申请, 只是简略获取执行后果,将测试后果进行传输
end := start.Add(time.Second)
r.Results() <- Result{Start: start, End: end}
start = end
}
r.Results() <- Result{Start: start, End: start.Add(time.Second), Err: fmt.Errorf("oops")}
// 假如所有压测申请都执行实现了
close(r.Results())
}()
// 主协程 汇总所有的处理结果, 而后生成压测报告
stats := <-r.Stats()
以上代码中,r
是通过 NewReportSample("%f")
创立的一个 Report
对象。而后,在一个独自的协程中,执行了一系列的测试,并将测试后果发送到 r.Results()
通道中。
这段代码的作用是模仿执行一系列的测试,并将测试后果发送到 Report
对象的 results
通道中。通过应用 r.Results()
办法返回的通道,能够将测试后果发送到报告对象中进行统计和解决。
接下来,主协程应该一直从 r.Results()
办法返回的通道中读取数据,汇总所有的处理结果,从而生成压测报告。这个办法其实是被封装在 r.Stas()
办法中,具体如下:
func (r *report) Stats() <-chan Stats {
// 创立一个 channel
donec := make(chan Stats, 1)
// 启动一个协程来执行
go func() {defer close(donec)
r.processResults()
s := r.stats.copy()
if r.sps != nil {s.TimeSeries = r.sps.getTimeSeries()
}
// 执行实现的话, 将后果返回
donec <- s
}()
// 返回 channel
return donec
}
// Stats 办法启动的协程中,理论运行的工作
func (r *report) processResults() {st := time.Now()
// 遍历 r.results 办法中 channel 中的数据, 而后执行解决流程
for res := range r.results {r.processResult(&res)
}
// 后续执行一些具体的计算逻辑
}
上述代码是 report
构造体中的两个办法,其中 Stats()
办法返回一个只读的 Stats
通道。这个办法会在一个独自的协程中执行,并解决 results
通道中的测试后果。事实上就是汇总 channel
中的数据,而后进行肯定的解决,而后返回。
5. 总结
本文通过介绍并发编程中的数据汇总问题,提出了应用互斥锁和通道来保障线程平安的办法。互斥锁实用于临界区爱护和共享资源的互斥拜访,但可能存在死锁和性能瓶颈的问题。相比之下,通道提供了更直观和平安的协程间通信形式,防止了锁的问题,并提供了更灵便的并发模式。
基于以上内容的介绍,大略可能明确下,在数据传递和汇总的场景下,应用 channel
来实现可能是更为适合的,可能进步代码的可读性和并发安全性。心愿以上内容对你有所帮忙。