从入门到放弃Java并发编程NIOChannel

前言上篇[【从入门到放弃-Java】并发编程-NIO使用]()简单介绍了nio的基础使用,本篇将深入源码分析nio中channel的实现。 简介channel即通道,可以用来读、写数据,它是全双工的可以同时用来读写操作。这也是它与stream流的最大区别。 channel需要与buffer配合使用,channel通道的一端是buffer,一端是数据源实体,如文件、socket等。在nio中,通过channel的不同实现来处理 不同实体与数据buffer中的数据传输。 channel接口: package java.nio.channels;import java.io.IOException;import java.io.Closeable;/** * A nexus for I/O operations. * * <p> A channel represents an open connection to an entity such as a hardware * device, a file, a network socket, or a program component that is capable of * performing one or more distinct I/O operations, for example reading or * writing. * * <p> A channel is either open or closed. A channel is open upon creation, * and once closed it remains closed. Once a channel is closed, any attempt to * invoke an I/O operation upon it will cause a {@link ClosedChannelException} * to be thrown. Whether or not a channel is open may be tested by invoking * its {@link #isOpen isOpen} method. * * <p> Channels are, in general, intended to be safe for multithreaded access * as described in the specifications of the interfaces and classes that extend * and implement this interface. * * * @author Mark Reinhold * @author JSR-51 Expert Group * @since 1.4 */public interface Channel extends Closeable { /** * Tells whether or not this channel is open. * * @return <tt>true</tt> if, and only if, this channel is open */ public boolean isOpen(); /** * Closes this channel. * * <p> After a channel is closed, any further attempt to invoke I/O * operations upon it will cause a {@link ClosedChannelException} to be * thrown. * * <p> If this channel is already closed then invoking this method has no * effect. * * <p> This method may be invoked at any time. If some other thread has * already invoked it, however, then another invocation will block until * the first invocation is complete, after which it will return without * effect. </p> * * @throws IOException If an I/O error occurs */ public void close() throws IOException;}常见的channel实现有: ...

July 8, 2019 · 12 min · jiezi

Go-channel-实现原理分析

channel一个类型管道,通过它可以在goroutine之间发送和接收消息。它是Golang在语言层面提供的goroutine间的通信方式。 众所周知,Go依赖于称为CSP(Communicating Sequential Processes)的并发模型,通过Channel实现这种同步模式。Go并发的核心哲学是不要通过共享内存进行通信; 相反,通过沟通分享记忆。 下面以简单的示例来演示Go如何通过channel来实现通信。 package mainimport ( "fmt" "time")func goRoutineA(a <-chan int) { val := <-a fmt.Println("goRoutineA received the data", val)}func goRoutineB(b chan int) { val := <-b fmt.Println("goRoutineB received the data", val)}func main() { ch := make(chan int, 3) go goRoutineA(ch) go goRoutineB(ch) ch <- 3 time.Sleep(time.Second * 1)}结果为: goRoutineA received the data 3 上面只是个简单的例子,只输出goRoutineA ,没有执行goRoutineB,说明channel仅允许被一个goroutine读写。 接下来我们通过源代码分析程序执行过程,在讲之前,如果不了解go 并发和调度相关知识。请阅读这篇文章 https://github.com/guyan0319/... 说道channel这里不得不提通道的结构hchan。 hchan源代码在src/runtime/chan.go type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex}type waitq struct { first *sudog last *sudog}说明: ...

May 14, 2019 · 3 min · jiezi

Knative-Eventing-中-Channel-如何注入默认-Provisioner

摘要: 在 Knative Eventing 中创建 Broker 时,如果不指定 provisioner, 系统会自动创建默认的 provisioner, 那么这个机制是如何实现的呢? 本文基于 Knative Eventing 0.5 版本,介绍了这个实现机制。场景通常的在创建Broker时,我们需要通过 spec.ChannelTemplate 指定使用某个具体的 Channel Provisioner。例如这样的Broker: apiVersion: eventing.knative.dev/v1alpha1kind: Brokermetadata: name: pubsub-channelspec: channelTemplate: provisioner: apiVersion: eventing.knative.dev/v1alpha1 kind: ClusterChannelProvisioner name: gcp-pubsub这里通过spec.ChannelTemplate 指定了名称为gcp-pubsub的provisioner。那么我们也遇到过这样的Broker: apiVersion: eventing.knative.dev/v1alpha1kind: Brokermetadata: name: default并没有指定使用某个具体的 channel, 但创建完Broker之后会发现已经创建出来了Channel: apiVersion: eventing.knative.dev/v1alpha1kind: Channelmetadata: ... name: default-broker-8ml79 namespace: default ownerReferences: - apiVersion: eventing.knative.dev/v1alpha1 blockOwnerDeletion: true controller: true kind: Broker name: default uid: 2e4c3332-6755-11e9-a81f-00163f005e02spec: provisioner: apiVersion: eventing.knative.dev/v1alpha1 kind: ClusterChannelProvisioner name: in-memory...分析我们知道 Broker创建之后,会通过 reconcile controller 会创建相应的Channel, 也就是下面这段代码: ...

May 13, 2019 · 2 min · jiezi

Golang channel 源码分析

之前知道go团队在实现channel这种协程间通信的大杀器时只用了700多行代码就解决了,所以就去膜拜读了一把,但之后复盘总觉得多少有点绕,直到有幸找到一个神级PPT https://speakerdeck.com/kavya… 生动形象的解释了channel底层是怎么工作和实现的,于是就带着这篇PPT再来复盘一遍channel的源码Hchan 数据结构初始化make(chan task, 3)初始化channel在调用方有两种, 一种是带缓冲的一种是非缓冲的,其初始化的具体实现除了缓冲非缓冲,还分channel的元素是否是指针类型Send满足send条件下往这个channel发送数据的代码, 假设当前没有另一个goroutine来接收channel的数据G1:for task := range tasks { ch <- task}Send to a full channel当channel满了之后 c.qcount > c.dataqsiz 如果还有数据发送到该channel则获取当前运行的goroutine封装成sudog,将其插入sendq 队列并通知系统将当前goroutine停止此时hchan的结构大致长这样sendq 和 recvq 都是一个由链表实现的FIFO队列这里涉及到三个没见过的东西1.sudogsudog 是对当前运行的goroutine和需要发送数据的封装,有一个前驱指正和后驱指针,hchan的sendq和recvq队列则是由sudog形成的双向链表2.goparkunlock —> goparkgopark 将当前goroutine置为等待状态3.goready —> readygoready 将某个goroutine 唤醒释放阻塞的sender goroutine 上面说到,channel容量已满后, 会阻塞当前goroutine并加到发送队列中, 那么什么时候会释放这个阻塞的goroutine呢。 之前看channel的学习文章时都说 发送者和接受者必须是成双成对的 (现在理解为一个gopark, 一个goready),在下面channel的接收端代码中可以看到因为当从channel中接收数据时, 如果sendq队列上有等待的的goroutine, 则将它pop出来, 执行接收操作(一会儿再讲)后调用goready将其唤醒这里可以看到 虽然 golang 有一句名言叫做 “Do not communicate by sharing memory; instead, share memory by communicating.” 告诉我们用通信的方式来共享内存而不是用共享内存的方式来通信,在channel的内部, 接收者和发送者两个goroutine却是通过共享hchan来实现通信的 (但是发送和接收的数据是通过拷贝来传递的)。send channel 小结当hchan 上没有等待的接收队列 (recvq) 的情况下, 往channel 发送数据可以总结成以下步骤hchan 上锁判断当前hchan 是否有足够的buf空间如果有, 拷贝数据到buf中对应的位置如果buf空间不够,或者初始化的是无缓冲channel, 阻塞当前goroutine并将其封装成sudog插到sendq中等待被接受者唤醒hchan 解锁这里只列出了当“hchan 的接收者队列上没有等待的goroutine” 时这种情况, 因为在上一句打引号的的情况中有一种之后需要解释的骚操作。Rcevchannel 的接收实现实质上和发送区别不大, 如果当前没有阻塞等待发送的goroutine 并且buf中有数据, 则从buf中将当前recvx索引初将需要接收的数据拷贝出来, 然后将其在buf中清除Recv from Sender and wakeup Sender如果在从channel接收时,发送队列上有正在阻塞等待的goroutine, 就是上一节中提到的send groutine如何被唤醒的那块内容, 拷贝 + 唤醒Recv from empty channel如果当前无阻塞等待发送数据的goroutine, 并且buf中没有等待接收的数据, 则同send一样,将当前的goroutine, 需要接收的数据指针,封装成sudog插入recvQ队列尾部, 调用gopark停止当前goroutine上一节说到, 发送端在接收队列中无阻塞等待的goroutine时会阻塞并插到sendq队列中,并留下了一个悬链说当接收队列上有goroutine时会发生一个骚操作。按上面的代码来看,这种情况接受者收到的数据也应该是从sendq中取出发送方的sudog并将其发送的值拷贝出来,但是在channel的实现中,当往一个 ”空buf(或者非缓冲)但是接收者队列上有阻塞goroutine的” channel发送数据时, 发送方会直接把数据写到接收队列中那个等待接收的goroutine中。比起等接收者从buf中拷贝数据或者从sendq队列中pop出sudog再拷贝数据,这样做少了一次拷贝的过程非正常情况下的sender, recver未初始化的channel往已经关闭的channel发送数据从已关闭的channel接受数据LAST带着这篇PPT来看channel的源码感觉一切都一目了然了, 反正这篇PPT一定要看,而且里面还包含了channel在阻塞goroutine时 go调度器运行状态的描述。 ...

April 15, 2019 · 1 min · jiezi

基于Swoole的通用连接池 - 数据库连接池

连接池open-smf/connection-pool 是一个基于Swoole的通用连接池,常被用作数据库连接池。依赖依赖版本PHP>=7.0.0Swoole>=4.2.9 Recommend 4.2.13+安装通过Composer安装。composer require “open-smf/connection-pool:~1.0"使用更多示例。基本用法use Smf\ConnectionPool\ConnectionPool;use Smf\ConnectionPool\Connectors\CoroutineMySQLConnector;use Swoole\Coroutine\MySQL;go(function () { // MySQL连接数区间:[10, 30] $pool = new ConnectionPool( [ ‘minActive’ => 10, ‘maxActive’ => 30, ‘maxWaitTime’ => 5, ‘maxIdleTime’ => 20, ‘idleCheckInterval’ => 10, ], new CoroutineMySQLConnector, // 指明连接器实例,这里使用协程MySQL连接器,这样就可以创建一个协程MySQL的数据库连接池 [ ‘host’ => ‘127.0.0.1’, ‘port’ => ‘3306’, ‘user’ => ‘root’, ‘password’ => ‘xy123456’, ‘database’ => ‘mysql’, ’timeout’ => 10, ‘charset’ => ‘utf8mb4’, ‘strict_type’ => true, ‘fetch_mode’ => true, ] ); echo “初始化连接池…\n”; $pool->init(); defer(function () use ($pool) { echo “关闭连接池…\n”; $pool->close(); }); echo “从连接池中借出连接…\n”; /@var MySQL $connection */ $connection = $pool->borrow(); defer(function () use ($pool, $connection) { echo “向连接池归还连接…\n”; $pool->return($connection); }); // 执行查询语句 $status = $connection->query(‘SHOW STATUS LIKE “Threads_connected”’); var_dump($status);});在Swoole Server中的用法use Smf\ConnectionPool\ConnectionPool;use Smf\ConnectionPool\ConnectionPoolTrait;use Smf\ConnectionPool\Connectors\CoroutineMySQLConnector;use Smf\ConnectionPool\Connectors\PhpRedisConnector;use Swoole\Coroutine\MySQL;use Swoole\Http\Request;use Swoole\Http\Response;use Swoole\Http\Server;class HttpServer{ use ConnectionPoolTrait; protected $swoole; public function __construct(string $host, int $port) { $this->swoole = new Server($host, $port); $this->setDefault(); $this->bindWorkerEvents(); $this->bindHttpEvent(); } protected function setDefault() { $this->swoole->set([ ‘daemonize’ => false, ‘dispatch_mode’ => 1, ‘max_request’ => 8000, ‘open_tcp_nodelay’ => true, ‘reload_async’ => true, ‘max_wait_time’ => 60, ’enable_reuse_port’ => true, ’enable_coroutine’ => true, ‘http_compression’ => false, ’enable_static_handler’ => false, ‘buffer_output_size’ => 4 * 1024 * 1024, ‘worker_num’ => 4, // 每个Worker持有一个独立的连接池 ]); } protected function bindHttpEvent() { $this->swoole->on(‘Request’, function (Request $request, Response $response) { $pool1 = $this->getConnectionPool(‘mysql’); /@var MySQL $mysql */ $mysql = $pool1->borrow(); defer(function () use ($pool1, $mysql) { $pool1->return($mysql); }); $status = $mysql->query(‘SHOW STATUS LIKE “Threads_connected”’); $pool2 = $this->getConnectionPool(‘redis’); /**@var Redis $redis */ $redis = $pool2->borrow(); defer(function () use ($pool2, $redis) { $this->pools[‘redis’]->return($redis); }); $clients = $redis->info(‘Clients’); $json = [ ‘status’ => $status, ‘clients’ => $clients, ]; $response->header(‘Content-Type’, ‘application/json’); $response->end(json_encode($json)); }); } protected function bindWorkerEvents() { $createPools = function () { // 所有的MySQL连接数区间:[4 workers * 2 = 8, 4 workers * 10 = 40] $pool1 = new ConnectionPool( [ ‘minActive’ => 2, ‘maxActive’ => 10, ], new CoroutineMySQLConnector, [ ‘host’ => ‘127.0.0.1’, ‘port’ => ‘3306’, ‘user’ => ‘root’, ‘password’ => ‘xy123456’, ‘database’ => ‘mysql’, ’timeout’ => 10, ‘charset’ => ‘utf8mb4’, ‘strict_type’ => true, ‘fetch_mode’ => true, ]); $pool1->init(); $this->addConnectionPool(‘mysql’, $pool1); // 所有Redis连接数区间:[4 workers * 5 = 20, 4 workers * 20 = 80] $pool2 = new ConnectionPool( [ ‘minActive’ => 5, ‘maxActive’ => 20, ], new PhpRedisConnector, [ ‘host’ => ‘127.0.0.1’, ‘port’ => ‘6379’, ‘database’ => 0, ‘password’ => null, ]); $pool2->init(); $this->addConnectionPool(‘redis’, $pool2); }; $closePools = function () { $this->closeConnectionPools(); }; // Worker启动时创建MySQL和Redis连接池 $this->swoole->on(‘WorkerStart’, $createPools); // Worker正常退出或错误退出时,关闭连接池,释放连接 $this->swoole->on(‘WorkerStop’, $closePools); $this->swoole->on(‘WorkerError’, $closePools); } public function start() { $this->swoole->start(); }}// 启用协程Runtime来让PhpRedis扩展一键协程化Swoole\Runtime::enableCoroutine(true);$server = new HttpServer(‘0.0.0.0’, 5200);$server->start();贡献Github,欢迎 Star & PR。 ...

March 16, 2019 · 2 min · jiezi

深入理解channel:设计+源码

channel是大家在Go中用的最频繁的特性,也是Go最自豪的特性之一,你有没有思考过:Why:为什么要设计channel?What:channel是什么样的?How:channel是如何实现的?这篇文章,就来回答这3个问题。channel解决什么问题?在Golang诞生之前,各编程语言都使用多线程进行编程,但多线程复杂、混乱、难以管理,对开发者并不是多么友好。Golang是Google为了解决高并发搜索而设计的,它们想使用简单的方式,高效解决并发问题,最后做成了,然后又把Golang开源了出来,以及到处推广,所以Golang自从诞生之初,就风风火火。从Golang文档中,我们可以知道,为啥Golang设计了channel,以及channel解决了什么问题?Go Concurrency Patterns:Concurrency is the key to designing high performance network services. Go’s concurrency primitives (goroutines and channels) provide a simple and efficient means of expressing concurrent execution. In this talk we see how tricky concurrency problems can be solved gracefully with simple Go code.Golang使用goroutine和channel简单、高效的解决并发问题,channel解决的是goroutine之间的通信。channel是怎么设计的?我们以为channel是一个通道:实际上,channel的内在是这样的:channel设计涉及的数据结构很简单:基于数组的循环队列,有缓冲的channel用它暂存数据基于链表的单向队列,用于保存阻塞在此channel上的goroutine我本来想自己码一篇channel的设计文章,但已经有大牛:Kavya深入分析了Channel的设计,我也相信自己写的肯定不如他好,所以我把Kavya在Gopher Con上的PPT推荐给你,如果你希望成为Go大牛,你一定要读一下,现在请收藏好。Kavya在Gopher Con上的演讲主题是:理解channel,他并不是教你如何使用channel,而是把channel的设计和goroutine的调度结合起来,从内在方式向你介绍。这份PPT足足有80页,包含了大量的动画,非常容易理解,你会了解到:channel的创建各种场景的发送和接收goroutine的调度goroutine的阻塞和唤醒channel和goroutine在select操作下Kavya的PPT应该包含了channel的80%的设计思想,但也有一些缺失,需要你阅读源码:channel关闭时,gorontine的处理创建channel时,不同的创建方法读channel时的非阻塞操作…PPT在此:Understanding Channels,如果你有心,还可以在这个网站看到Kavya关于goroutine调度的PPT,福利哦????。(访问不了请翻墙,或阅读原文从博客文章最下面看Github备份)channel是怎么实现的?chan.go是channel的主要实现文件,只有700行,十分佩服Go团队,实现的如此精简,却发挥如此大的作用!!!看完Kavya的PPT,你已经可以直接看channel的源码了,如果有任何问题,思考一下你也可以想通,如果有任何问题可博客文章留言或公众号私信进行讨论。另外,推荐一篇在Medium(国外高质量文章社区)上获得500+赞的源码分析文章,非常详细。文章链接:Diving deep into the golang channels我学到了什么?阅读channel源码我学到了一些东西,分享给大家。channel的4个特性的实现:channel的goroutine安全,是通过mutex实现的。channel的FIFO,是通过循环队列实现的。channel的通信:在goroutine间传递数据,是通过仅共享hchan+数据拷贝实现的。channel的阻塞是通过goroutine自己挂起,唤醒goroutine是通过对方goroutine唤醒实现的。channel的其他实现:发送goroutine是可以访问接收goroutine的内存空间的,接收goroutine也是可以直接访问发送goroutine的内存空间的,看sendDirect、recvDirect函数。无缓冲的channel始终都是直接访问对方goroutine内存的方式,把手伸到别人的内存,把数据放到接收变量的内存,或者从发送goroutine的内存拷贝到自己内存。省掉了对方再加锁获取数据的过程。接收goroutine读不到数据和发送goroutine无法写入数据时,是把自己挂起的,这就是channel的阻塞操作。阻塞的接收goroutine是由发送goroutine唤醒的,阻塞的发送goroutine是由接收goroutine唤醒的,看gopark、goready函数在chan.go中的调用。接收goroutine当channel关闭时,读channel会得到0值,并不是channel保存了0值,而是它发现channel关闭了,把接收数据的变量的值设置为0值。channel的操作/调用,是通过reflect实现的,可以看reflect包的makechan, chansend, chanrecv函数。如果阅读chan_test.go还会学到一些骚操作,比如:if <-stopCh { // do stop}而不是写成:if stop := <-stopCh; stop { // do stop}这就是关于channel的设计和实现的分享,希望你通过Kavya的PPT和代码阅读能深入了解channel。链接chan.go:https://github.com/golang/go/…chan_test.go:https://github.com/golang/go/…Understanding channels在Github的备份: https://github.com/Shitaibin/…如果这篇文章对你有帮助,请点个赞/喜欢,感谢。本文作者:大彬如果喜欢本文,随意转载,但请保留此原文链接:http://www.lessisbetter.site/2019/03/03/golang-channel-design-and-source/

March 4, 2019 · 1 min · jiezi

Go优雅重启Web server示例-讲解版

本文参考 GRACEFULLY RESTARTING A GOLANG WEB SERVER进行归纳和说明。你也可以从这里拿到添加备注的代码版本。我做了下分割,方便你能看懂。问题因为 golang 是编译型的,所以当我们修改一个用 go 写的服务的配置后,需要重启该服务,有的甚至还需要重新编译,再发布。如果在重启的过程中有大量的请求涌入,能做的无非是分流,或者堵塞请求。不论哪一种,都不优雅~,所以slax0r以及他的团队,就试图探寻一种更加平滑的,便捷的重启方式。原文章中除了排版比较帅外,文字内容和说明还是比较少的,所以我希望自己补充一些说明。原理上述问题的根源在于,我们无法同时让两个服务,监听同一个端口。解决方案就是复制当前的 listen 文件,然后在新老进程之间通过 socket 直接传输参数和环境变量。新的开启,老的关掉,就这么简单。防看不懂须知Unix domain socket一切皆文件先玩一下运行程序,过程中打开一个新的 console,输入 kill -1 [进程号],你就能看到优雅重启的进程了。代码思路func main() { 主函数,初始化配置 调用serve()}func serve() { 核心运行函数 getListener() // 1. 获取监听 listener start() // 2. 用获取到的 listener 开启 server 服务 waitForSignal() // 3. 监听外部信号,用来控制程序 fork 还是 shutdown}func getListener() { 获取正在监听的端口对象 (第一次运行新建)}func start() { 运行 http server}func waitForSignal() { for { 等待外部信号 1. fork子进程 2. 关闭进程 }}上面是代码思路的说明,基本上我们就围绕这个大纲填充完善代码。定义结构体我们抽象出两个结构体,描述程序中公用的数据结构var cfg srvCfgtype listener struct { // Listener address Addr string json:"addr" // Listener file descriptor FD int json:"fd" // Listener file name Filename string json:"filename"}type srvCfg struct { sockFile string addr string ln net.Listener shutDownTimeout time.Duration childTimeout time.Duration}listener 是我们的监听者,他包含了监听地址,文件描述符,文件名。文件描述符其实就是进程所需要打开的文件的一个索引,非负整数。我们平时创建一个进程时候,linux都会默认打开三个文件,标准输入stdin,标准输出stdout,标准错误stderr,这三个文件各自占用了 0,1,2 三个文件描述符。所以之后你进程还要打开文件的话,就得从 3 开始了。这个listener,就是我们进程之间所要传输的数据了。srvCfg 是我们的全局环境配置,包含 socket file 路径,服务监听地址,监听者对象,父进程超时时间,子进程超时时间。因为是全局用的配置数据,我们先 var 一下。入口看看我们的 main 长什么样子func main() { serve(srvCfg{ sockFile: “/tmp/api.sock”, addr: “:8000”, shutDownTimeout: 5time.Second, childTimeout: 5*time.Second, }, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Write([]byte(Hello, world!)) }))}func serve(config srvCfg, handler http.Handler) { cfg = &config var err error // get tcp listener cfg.ln, err = getListener() if err != nil { panic(err) } // return an http Server srv := start(handler) // create a wait routine err = waitForSignals(srv) if err != nil { panic(err) }}很简单,我们把配置都准备好了,然后还注册了一个 handler–输出 Hello, world!serve 函数的内容就和我们之前的思路一样,只不过多了些错误判断。接下去,我们一个一个看里面的函数…获取 listener也就是我们的 getListener() 函数func getListener() (net.Listener, error) { // 第一次执行不会 importListener ln, err := importListener() if err == nil { fmt.Printf(“imported listener file descriptor for addr: %s\n”, cfg.addr) return ln, nil } // 第一次执行会 createListener ln, err = createListener() if err != nil { return nil, err } return ln, err}func importListener() (net.Listener, error) { …}func createListener() (net.Listener, error) { fmt.Println(“首次创建 listener”, cfg.addr) ln, err := net.Listen(“tcp”, cfg.addr) if err != nil { return nil, err } return ln, err}因为第一次不会执行 importListener, 所以我们暂时不需要知道 importListener 里是怎么实现的。只肖明白 createListener 返回了一个监听对象。而后就是我们的 start 函数func start(handler http.Handler) *http.Server { srv := &http.Server{ Addr: cfg.addr, Handler: handler, } // start to serve go srv.Serve(cfg.ln) fmt.Println(“server 启动完成,配置信息为:",cfg.ln) return srv}很明显,start 传入一个 handler,然后协程运行一个 http server。监听信号监听信号应该是我们这篇里面重头戏的入口,我们首先来看下代码:func waitForSignals(srv *http.Server) error { sig := make(chan os.Signal, 1024) signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP) for { select { case s := <-sig: switch s { case syscall.SIGHUP: err := handleHangup() // 关闭 if err == nil { // no error occured - child spawned and started return shutdown(srv) } case syscall.SIGTERM, syscall.SIGINT: return shutdown(srv) } } }}首先建立了一个通道,这个通道用来接收系统发送到程序的命令,比如kill -9 myprog,这个 9 就是传到通道里的。我们用 Notify 来限制会产生响应的信号,这里有:SIGTERMSIGINTSIGHUP关于信号如果实在搞不清这三个信号的区别,只要明白我们通过区分信号,留给了进程自己判断处理的余地。然后我们开启了一个循环监听,显而易见地,监听的就是系统信号。当信号为 syscall.SIGHUP ,我们就要重启进程了。而当信号为 syscall.SIGTERM, syscall.SIGINT 时,我们直接关闭进程。于是乎,我们就要看看,handleHangup 里面到底做了什么。父子间的对话进程之间的优雅重启,我们可以看做是一次愉快的父子对话,爸爸给儿子开通了一个热线,爸爸通过热线把现在正在监听的端口信息告诉儿子,儿子在接受到必要的信息后,子承父业,开启新的空进程,告知爸爸,爸爸正式退休。func handleHangup() error { c := make(chan string) defer close(c) errChn := make(chan error) defer close(errChn) // 开启一个热线通道 go socketListener(c, errChn) for { select { case cmd := <-c: switch cmd { case “socket_opened”: p, err := fork() if err != nil { fmt.Printf(“unable to fork: %v\n”, err) continue } fmt.Printf(“forked (PID: %d), waiting for spinup”, p.Pid) case “listener_sent”: fmt.Println(“listener sent - shutting down”) return nil } case err := <-errChn: return err } } return nil}socketListener 开启了一个新的 unix socket 通道,同时监听通道的情况,并做相应的处理。处理的情况说白了就只有两种:通道开了,说明我可以造儿子了(fork),儿子来接爸爸的信息爸爸把监听对象文件都传给儿子了,爸爸完成使命handleHangup 里面的东西有点多,不要慌,我们一个一个来看。先来看 socketListener:func socketListener(chn chan<- string, errChn chan<- error) { // 创建 socket 服务端 fmt.Println(“创建新的socket通道”) ln, err := net.Listen(“unix”, cfg.sockFile) if err != nil { errChn <- err return } defer ln.Close() // signal that we created a socket fmt.Println(“通道已经打开,可以 fork 了”) chn <- “socket_opened” // accept // 阻塞等待子进程连接进来 c, err := acceptConn(ln) if err != nil { errChn <- err return } // read from the socket buf := make([]byte, 512) nr, err := c.Read(buf) if err != nil { errChn <- err return } data := buf[0:nr] fmt.Println(“获得消息子进程消息”, string(data)) switch string(data) { case “get_listener”: fmt.Println(“子进程请求 listener 信息,开始传送给他吧~”) err := sendListener(c) // 发送文件描述到新的子进程,用来 import Listener if err != nil { errChn <- err return } // 传送完毕 fmt.Println(“listener 信息传送完毕”) chn <- “listener_sent” }}sockectListener创建了一个 unix socket 通道,创建完毕后先发送了 socket_opened 这个信息。这时候 handleHangup 里的 case “socket_opened” 就会有反应了。同时,socketListener 一直在 accept 阻塞等待新程序的信号,从而发送原 listener 的文件信息。直到发送完毕,才会再告知 handlerHangup listener_sent。下面是 acceptConn 的代码,并没有复杂的逻辑,就是等待子程序请求、处理超时和错误。func acceptConn(l net.Listener) (c net.Conn, err error) { chn := make(chan error) go func() { defer close(chn) fmt.Printf(“accept 新连接%+v\n”, l) c, err = l.Accept() if err != nil { chn <- err } }() select { case err = <-chn: if err != nil { fmt.Printf(“error occurred when accepting socket connection: %v\n”, err) } case <-time.After(cfg.childTimeout): fmt.Println(“timeout occurred waiting for connection from child”) } return}还记的我们之前定义的 listener 结构体吗?这时候就要派上用场了:func sendListener(c net.Conn) error { fmt.Printf(“发送老的 listener 文件 %+v\n”, cfg.ln) lnFile, err := getListenerFile(cfg.ln) if err != nil { return err } defer lnFile.Close() l := listener{ Addr: cfg.addr, FD: 3, // 文件描述符,进程初始化描述符为0 stdin 1 stdout 2 stderr,所以我们从3开始 Filename: lnFile.Name(), } lnEnv, err := json.Marshal(l) if err != nil { return err } fmt.Printf(“将 %+v\n 写入连接\n”, string(lnEnv)) _, err = c.Write(lnEnv) if err != nil { return err } return nil}func getListenerFile(ln net.Listener) (*os.File, error) { switch t := ln.(type) { case *net.TCPListener: return t.File() case *net.UnixListener: return t.File() } return nil, fmt.Errorf(“unsupported listener: %T”, ln)}sendListener 先将我们正在使用的tcp监听文件(一切皆文件)做了一份拷贝,并把必要的信息塞进了listener 结构体中,序列化后用 unix socket 传输给新的子进程。说了这么多都是爸爸进程的代码,中间我们跳过了创建子进程,那下面我们来看看 fork,也是一个重头戏:func fork() (*os.Process, error) { // 拿到原监听文件描述符并打包到元数据中 lnFile, err := getListenerFile(cfg.ln) fmt.Printf(“拿到监听文件 %+v\n,开始创建新进程\n”, lnFile.Name()) if err != nil { return nil, err } defer lnFile.Close() // 创建子进程时必须要塞的几个文件 files := []*os.File{ os.Stdin, os.Stdout, os.Stderr, lnFile, } // 拿到新进程的程序名,因为我们是重启,所以就是当前运行的程序名字 execName, err := os.Executable() if err != nil { return nil, err } execDir := filepath.Dir(execName) // 生孩子了 p, err := os.StartProcess(execName, []string{execName}, &os.ProcAttr{ Dir: execDir, Files: files, Sys: &syscall.SysProcAttr{}, }) fmt.Println(“创建子进程成功”) if err != nil { return nil, err } // 这里返回 nil 后就会直接 shutdown 爸爸进程 return p, nil}当执行 StartProcess 的那一刻,你会意识到,子进程的执行会回到最初的地方,也就是 main 开始。这时候,我们 获取 listener中的 importListener 方法就会被激活:func importListener() (net.Listener, error) { // 向已经准备好的 unix socket 建立连接,这个是爸爸进程在之前就建立好的 c, err := net.Dial(“unix”, cfg.sockFile) if err != nil { fmt.Println(“no unix socket now”) return nil, err } defer c.Close() fmt.Println(“准备导入原 listener 文件…”) var lnEnv string wg := sync.WaitGroup{} wg.Add(1) go func(r io.Reader) { defer wg.Done() // 读取 conn 中的内容 buf := make([]byte, 1024) n, err := r.Read(buf[:]) if err != nil { return } lnEnv = string(buf[0:n]) }(c) // 写入 get_listener fmt.Println(“告诉爸爸我要 ‘get-listener’ 了”) _, err = c.Write([]byte(“get_listener”)) if err != nil { return nil, err } wg.Wait() // 等待爸爸传给我们参数 if lnEnv == "” { return nil, fmt.Errorf(“Listener info not received from socket”) } var l listener err = json.Unmarshal([]byte(lnEnv), &l) if err != nil { return nil, err } if l.Addr != cfg.addr { return nil, fmt.Errorf(“unable to find listener for %v”, cfg.addr) } // the file has already been passed to this process, extract the file // descriptor and name from the metadata to rebuild/find the *os.File for // the listener. // 我们已经拿到了监听文件的信息,我们准备自己创建一份新的文件并使用 lnFile := os.NewFile(uintptr(l.FD), l.Filename) fmt.Println(“新文件名:”, l.Filename) if lnFile == nil { return nil, fmt.Errorf(“unable to create listener file: %v”, l.Filename) } defer lnFile.Close() // create a listerer with the *os.File ln, err := net.FileListener(lnFile) if err != nil { return nil, err } return ln, nil}这里的 importListener 执行时间,就是在父进程创建完新的 unix socket 通道后。至此,子进程开始了新的一轮监听,服务…结束代码量虽然不大,但是传递了一个很好的优雅重启思路,有些地方还是要实践一下才能理解(对于我这种新手而言)。其实网上还有很多其他优雅重启的方式,大家可以 Google 一下。希望我上面简单的讲解能够帮到你,如果有错误的话请及时指出,我会更正的。你也可以从这里拿到添加备注的代码版本。我做了下分割,方便你能看懂。 ...

January 23, 2019 · 6 min · jiezi

总结了才知道,原来channel有这么多用法!

这篇文章总结了channel的10种常用操作,以一个更高的视角看待channel,会给大家带来对channel更全面的认识。在介绍10种操作前,先简要介绍下channel的使用场景、基本操作和注意事项。channel的使用场景把channel用在数据流动的地方:消息传递、消息过滤信号广播事件订阅与广播请求、响应转发任务分发结果汇总并发控制同步与异步…channel的基本操作和注意事项channel存在3种状态:nil,未初始化的状态,只进行了声明,或者手动赋值为nilactive,正常的channel,可读或者可写closed,已关闭,千万不要误认为关闭channel后,channel的值是nilchannel可进行3种操作:读写关闭把这3种操作和3种channel状态可以组合出9种情况:对于nil通道的情况,也并非完全遵循上表,有1个特殊场景:当nil的通道在select的某个case中时,这个case会阻塞,但不会造成死锁。参考代码请看:https://dave.cheney.net/2014/…下面介绍使用channel的10种常用操作。1. 使用for range读channel场景:当需要不断从channel读取数据时原理:使用for-range读取channel,这样既安全又便利,当channel关闭时,for循环会自动退出,无需主动监测channel是否关闭,可以防止读取已经关闭的channel,造成读到数据为通道所存储的数据类型的零值。用法:for x := range ch{ fmt.Println(x)}2. 使用_,ok判断channel是否关闭场景:读channel,但不确定channel是否关闭时原理:读已关闭的channel会造成panic,如果不确定channel,需要使用ok进行检测。ok的结果和含义:true:读到数据,并且通道没有关闭。false:通道关闭,无数据读到。用法:if v, ok := <- ch; ok { fmt.Println(v)}3. 使用select处理多个channel场景:需要对多个通道进行同时处理,但只处理最先发生的channel时原理:select可以同时监控多个通道的情况,只处理未阻塞的case。当通道为nil时,对应的case永远为阻塞,无论读写。特殊关注:普通情况下,对nil的通道写操作是要panic的。用法:// 分配job时,如果收到关闭的通知则退出,不分配jobfunc (h *Handler) handle(job *Job) { select { case h.jobCh<-job: return case <-h.stopCh: return }}4. 使用channel的声明控制读写权限场景:协程对某个通道只读或只写时目的:A. 使代码更易读、更易维护,B. 防止只读协程对通道进行写数据,但通道已关闭,造成panic。用法:如果协程对某个channel只有写操作,则这个channel声明为只写。如果协程对某个channel只有读操作,则这个channe声明为只读。// 只有generator进行对outCh进行写操作,返回声明// <-chan int,可以防止其他协程乱用此通道,造成隐藏bugfunc generator(int n) <-chan int { outCh := make(chan int) go func(){ for i:=0;i<n;i++{ outCh<-i } }() return outCh}// consumer只读inCh的数据,声明为<-chan int// 可以防止它向inCh写数据func consumer(inCh <-chan int) { for x := range inCh { fmt.Println(x) }}5. 使用缓冲channel增强并发和异步场景:异步和并发原理:A. 有缓冲通道是异步的,无缓冲通道是同步的,B. 有缓冲通道可供多个协程同时处理,在一定程度可提高并发性。用法:// 无缓冲,同步ch1 := make(chan int)ch2 := make(chan int, 0)// 有缓冲,异步ch3 := make(chan int, 1)// 使用5个do协程同时处理输入数据func test() { inCh := generator(100) outCh := make(chan int, 10) for i := 0; i < 5; i++ { go do(inCh, outCh) } for r := range outCh { fmt.Println(r) }}func do(inCh <-chan int, outCh chan<- int) { for v := range inCh { outCh <- v * v }}6. 为操作加上超时场景:需要超时控制的操作原理:使用select和time.After,看操作和定时器哪个先返回,处理先完成的,就达到了超时控制的效果用法:func doWithTimeOut(timeout time.Duration) (int, error) { select { case ret := <-do(): return ret, nil case <-time.After(timeout): return 0, errors.New(“timeout”) }}func do() <-chan int { outCh := make(chan int) go func() { // do work }() return outCh}7. 使用time实现channel无阻塞读写场景:并不希望在channel的读写上浪费时间原理:是为操作加上超时的扩展,这里的操作是channel的读或写用法:func unBlockRead(ch chan int) (x int, err error) { select { case x = <-ch: return x, nil case <-time.After(time.Microsecond): return 0, errors.New(“read time out”) }}func unBlockWrite(ch chan int, x int) (err error) { select { case ch <- x: return nil case <-time.After(time.Microsecond): return errors.New(“read time out”) }}注:time.After等待可以替换为default,则是channel阻塞时,立即返回的效果8. 使用close(ch)关闭所有下游协程场景:退出时,显示通知所有协程退出原理:所有读ch的协程都会收到close(ch)的信号用法:func (h *Handler) Stop() { close(h.stopCh) // 可以使用WaitGroup等待所有协程退出}// 收到停止后,不再处理请求func (h *Handler) loop() error { for { select { case req := <-h.reqCh: go handle(req) case <-h.stopCh: return } }}9. 使用chan struct{}作为信号channel场景:使用channel传递信号,而不是传递数据时原理:没数据需要传递时,传递空struct用法:// 上例中的Handler.stopCh就是一个例子,stopCh并不需要传递任何数据// 只是要给所有协程发送退出的信号type Handler struct { stopCh chan struct{} reqCh chan *Request}10. 使用channel传递结构体的指针而非结构体场景:使用channel传递结构体数据时原理:channel本质上传递的是数据的拷贝,拷贝的数据越小传输效率越高,传递结构体指针,比传递结构体更高效用法:reqCh chan *Request// 好过reqCh chan Request你有哪些channel的奇淫巧技,说来看看?如果这篇文章对你有帮助,请点个赞/喜欢,感谢。本文作者:大彬如果喜欢本文,随意转载,但请保留此原文链接:http://lessisbetter.site/2019/01/20/golang-channel-all-usage/ ...

January 21, 2019 · 2 min · jiezi

Golang并发:再也不愁选channel还是选锁

周末又到了,为大家准备了一份实用干货:如何使用channel和Mutex解决并发问题,利用周末的好时光,配上音乐,思考一下吧????。来,问自己个问题:面对并发问题,是用channel解决,还是用Mutex解决?如果自己心里还没有清晰的答案,那就读下这篇文章,你会了解到:使用channel解决并发问题的核心思路和示例channel擅长解决什么样的并发问题,Mutex擅长解决什么样的并发问题一个并发问题该怎么入手解解决一个重要的plus思维前戏前面很多篇的文章都在围绕channel介绍,而只有前一篇sync的文章介绍到了Mutex,不是我偏心,而是channel在Golang是first class级别的,设计在语言特性中的,而Mutex只是一个包中的。这就注定了一个是主角,一个是配角。并且Golang还有一个并发座右铭,在《Effective Go》的channel介绍中写到:Share memory by communicating, don’t communicate by sharing memory.通过通信共享内存,而不是通过共享内存而通信。Golang以如此明显的方式告诉我们:面对并发问题,你首先想到的应该是channel,因为channel是线程安全的并且不会有数据冲突,比锁好用多了。既生瑜,何生亮。既然有channel了,为啥还提供sync.Mutex呢?主角不是万能的,他也需要配角。在Golang里,channel也不是万能的,这是由channel的特性和局限造成的。下面就给大家介绍channel的特点、核心方法和缺点。channel解决并发问题的思路和示例channel的核心是数据流动,关注到并发问题中的数据流动,把流动的数据放到channel中,就能使用channel解决这个并发问题。这个思路是从Go语言的核心开发者的演讲中学来的,然而视频我已经找不到了,不然直接共享给大家,他提到了Golang并发的核心实践的4个点:DataFlow -> Drawing -> Pipieline -> ExitingDataFlow指数据流动,Drawing指把数据流动画出来,Pipeline指的是流水线,Exit指协程的退出。DataFlow + Drawing就是我提到到channel解决并发问题的思路,Pipeline和Exit是具体的实践模式,Pipeline和Exit我都写过文章,有需要自取:Golang并发模型:轻松入门流水线模型Golang并发模型:轻松入门流水线FAN模式Golang并发模型:并发协程的优雅退出下面我使用例子具体解释DataFlow + Drawing。借用《Golang并发的次优选择:sync包》中银行的例子,介绍如何使用channel解决例子中银行的并发问题:银行支持多个用户的同时操作。顺便看下同一个并发问题,使用channel和Mutex解决是什么差别。一起分析下多个用户同时操作银行的数据流动:每个人都可以向银行发起请求,请求可以是存、取、查3种操作,并且包含操作时必要的数据,包含的数据只和自身相关。银行处理请求后给用户发送响应,包含的数据只和操作用户相关。你一定发现了上面的数据流动:请求数据:个人请求数据流向银行。响应数据:银行处理结果数据流向用户。channel是数据流动的通道/管道,为流动的数据建立通道,这里需要建立2类channel:reqCh:传送请求的channel,把请求从个人发送给银行。retCh:传送响应的channel,把响应从银行发给个人。我们把channel添加到上图中,得到下面的图:以上就是从数据流动的角度,发现如何使用channel解决并发问题。思路有了,再思考下代码层面需要怎么做:银行:定义银行,只保存1个map即可银行操作:接收和解析请求,并把请求分发给存、取、查函数实现存、取、查函数:处理请求,并把结果写入到用户提供的响应通道定义请求和响应用户:创建请求和接收响应的通道,发送请求后等待响应,提取响应结果mian函数:创建银行和用户间的请求通道,创建银行、用户等协程,并等待操作完成以上,我们这个并发问题的逻辑实现和各块工作就清晰了,写起来也方便、简单。代码实现有200多行,公众号不方便查看,可以点阅读原文,一键直达。代码不能贴了,运行结果还是可以的,为了方便理解结果,介绍下示例代码做了什么。main函数创建了银行、小明、小刚3个并发协程:银行:从reqCh接收请求,依次处理每个请求,直到通道关闭,把请求交给处理函数,处理函数把结果写入到请求中的retCh。用户小明:创建了存100、取20、查余额的3个请求,每个请求得到响应后,再把下一个请求写入到reqCh。用户小刚:流程和小明相同,但存100取200,造成取钱操作失败,他查询下自己又多少钱,得到100。main函数最后使用WaitGroup等待小明、小刚结束后退出。下面是运行结果:$ go run channel_map.goxiaogang deposite 100 successxiaoming deposite 100 successxiaogang withdraw 200 failedxiaoming withdraw 20 successxiaogang has 100xiaoming has 80Bank exit这一遭搞完,发现啥没有?用Mutex直接加锁、解锁完事了,但channel搞出来一坨,是不是用channel解决这个问题不太适合?是的。对于当前这个问题,和Mutex的方案相比,channel的方案显的有点“重”,不够简洁、高效、易用。但这个例子展示了3点:使用channel解决并发问题的核心在于关注数据的流动channel不一定是某个并发问题最好的解决方案map在并发中,可以不用锁进行保护,而是使用channel现在,回到了开篇的问题:同一个并发问题,你是用channel解决,还是用mutex解决?下面,一起看看怎么选择。channel和mutex的选择面对一个并发问题的时候,应当选择合适的并发方式:channel还是mutex。选择的依据是他们的能力/特性:channel的能力是让数据流动起来,擅长的是数据流动的场景,《Channel or Mutex》中给了3个数据流动的场景:传递数据的所有权,即把某个数据发送给其他协程分发任务,每个任务都是一个数据交流异步结果,结果是一个数据mutex的能力是数据不动,某段时间只给一个协程访问数据的权限擅长数据位置固定的场景,《Channel or Mutex》中给了2个数据不动场景:缓存状态,我们银行例子中的map就是一种状态提供解决并发问题的一个思路:先找到数据的流动,并且还要画出来,数据流动的路径换成channel,channel的两端设计成协程基于画出来的图设计简要的channel方案,代码需要做什么这个方案是不是有点复杂,是不是用Mutex更好一点?设计一个简要的Mutex方案,对比&选择易做的、高效的channel + mutex思维面对并发问题,除了channel or mutex,你还有另外一个选择:channel plus mutex。一个大并发问题,可以分解成很多小的并发问题,每个小的并发都可以单独选型:channel or mutex。但对于整个大的问题,通常不是channel or mutex,而是channel plus mutex。如果你是认为是channel and mutex也行,但我更喜欢plus,体现相互配合。总结读到这里,感觉这篇文章头重脚轻,channel的讲了很多,而channel和mutex的选择却讲的很少。在channel和mutex的选择,实际并没有一个固定答案,也没有固定的方法,但提供了一个简单的思路:设计出channel和Mutex的简单方案,然后选择最适合当前业务、问题的那个。思考比结论更重要,希望你有所收获:关注数据的流动,就可以使用channel解决并发问题。不流动的数据,如果存在并发访问,尝试使用sync.Mutex保护数据。channel不一定某个并发问题的最优解。不要害怕、拒绝使用mutex,如果mutex是问题的最优解,那就大胆使用。对于大问题,channel plus mutex也许才是更好的方案。参考资料《Effective Go》,https://golang.org/doc/effect…《Mutex Or Channel》,https://github.com/golang/go/…文章推荐Golang并发模型:轻松入门流水线模型Golang并发模型:轻松入门流水线FAN模式Golang并发模型:并发协程的优雅退出Golang并发的次优选择:sync包如果这篇文章对你有帮助,请点个赞/喜欢,感谢。本文作者:大彬如果喜欢本文,随意转载,但请保留此原文链接:http://lessisbetter.site/2019/01/14/golang-channel-and-mutex/

January 14, 2019 · 1 min · jiezi

Golang并发:除了channle,你还有其他选择

我们都知道Golang并发优选channel,但channel不是万能的,Golang为我们提供了另一种选择:sync。通过这篇文章,你会了解sync包最基础、最常用的方法,至于sync和channel之争留给下一篇文章。sync包提供了基础的异步操作方法,比如互斥锁(Mutex)、单次执行(Once)和等待组(WaitGroup),这些异步操作主要是为低级库提供,上层的异步/并发操作最好选用通道和通信。sync包提供了:Mutex:互斥锁RWMutex:读写锁WaitGroup:等待组Once:单次执行Cond:信号量Pool:临时对象池Map:自带锁的map这篇文章是sync包的入门文章,所以只介绍常用的结构和方法:Mutex、RWMutex、WaitGroup、Once,而Cond、Pool和Map留给大家自行探索,或有需求再介绍。互斥锁常做并发工作的朋友对互斥锁应该不陌生,Golang里互斥锁需要确保的是某段时间内,不能有多个协程同时访问一段代码(临界区)。互斥锁被称为Mutex,它有2个函数,Lock()和Unlock()分别是获取锁和释放锁,如下:type Mutexfunc (m *Mutex) Lock(){}func (m *Mutex) Unlock(){}Mutex的初始值为未锁的状态,并且Mutex通常作为结构体的匿名成员存在。经过了上面这么“官方”的介绍,举个例子:你在工商银行有100元存款,这张卡绑定了支付宝和微信,在中午12点你用支付宝支付外卖30元,你在微信发红包,抢到10块。银行需要按顺序执行上面两件事,先减30再加10或者先加10再减30,结果都是80,但如果同时执行,结果可能是,只减了30或者只加了10,即你有70元或者你有110元。前一个结果是你赔了,后一个结果是银行赔了,银行可不希望把这种事算错。看看实际使用吧:创建一个银行,银行里存每个账户的钱,存储查询都加了锁操作,这样银行就不会算错账了。银行的定义:type Bank struct { sync.Mutex saving map[string]int // 每账户的存款金额}func NewBank() *Bank { b := &Bank{ saving: make(map[string]int), } return b}银行的存取钱:// Deposit 存款func (b *Bank) Deposit(name string, amount int) { b.Lock() defer b.Unlock() if _, ok := b.saving[name]; !ok { b.saving[name] = 0 } b.saving[name] += amount}// Withdraw 取款,返回实际取到的金额func (b *Bank) Withdraw(name string, amount int) int { b.Lock() defer b.Unlock() if _, ok := b.saving[name]; !ok { return 0 } if b.saving[name] < amount { amount = b.saving[name] } b.saving[name] -= amount return amount}// Query 查询余额func (b *Bank) Query(name string) int { b.Lock() defer b.Unlock() if _, ok := b.saving[name]; !ok { return 0 } return b.saving[name]}模拟操作:小米支付宝存了100,并且同时花了20。func main() { b := NewBank() go b.Deposit(“xiaoming”, 100) go b.Withdraw(“xiaoming”, 20) go b.Deposit(“xiaogang”, 2000) time.Sleep(time.Second) fmt.Printf(“xiaoming has: %d\n”, b.Query(“xiaoming”)) fmt.Printf(“xiaogang has: %d\n”, b.Query(“xiaogang”))}结果:先存后花。➜ sync_pkg git:(master) ✗ go run mutex.goxiaoming has: 80xiaogang has: 2000也可能是:先花后存,因为先花20,因为小明没钱,所以没花出去。➜ sync_pkg git:(master) ✗ go run mutex.goxiaoming has: 100xiaogang has: 2000这个例子只是介绍了mutex的基本使用,如果你想多研究下mutex,那就去我的Github(阅读原文)下载下来代码,自己修改测试。Github中还提供了没有锁的例子,运行多次总能碰到错误:fatal error: concurrent map writes这是由于并发访问map造成的。读写锁读写锁是互斥锁的特殊变种,如果是计算机基本知识扎实的朋友会知道,读写锁来自于读者和写者的问题,这个问题就不介绍了,介绍下我们的重点:读写锁要达到的效果是同一时间可以允许多个协程读数据,但只能有且只有1个协程写数据。也就是说,读和写是互斥的,写和写也是互斥的,但读和读并不互斥。具体讲,当有至少1个协程读时,如果需要进行写,就必须等待所有已经在读的协程结束读操作,写操作的协程才获得锁进行写数据。当写数据的协程已经在进行时,有其他协程需要进行读或者写,就必须等待已经在写的协程结束写操作。读写锁是RWMutex,它有5个函数,它需要为读操作和写操作分别提供锁操作,这样就4个了:Lock()和Unlock()是给写操作用的。RLock()和RUnlock()是给读操作用的。RLocker()能获取读锁,然后传递给其他协程使用。使用较少。type RWMutexfunc (rw *RWMutex) Lock(){}func (rw *RWMutex) RLock(){}func (rw *RWMutex) RLocker() Locker{}func (rw *RWMutex) RUnlock(){}func (rw *RWMutex) Unlock(){}上面的银行实现不合理:大家都是拿手机APP查余额,可以同时几个人一起查呀,这根本不影响,银行的锁可以换成读写锁。存、取钱是写操作,查询金额是读操作,代码修改如下,其他不变:type Bank struct { sync.RWMutex saving map[string]int // 每账户的存款金额}// Query 查询余额func (b *Bank) Query(name string) int { b.RLock() defer b.RUnlock() if _, ok := b.saving[name]; !ok { return 0 } return b.saving[name]}func main() { b := NewBank() go b.Deposit(“xiaoming”, 100) go b.Withdraw(“xiaoming”, 20) go b.Deposit(“xiaogang”, 2000) time.Sleep(time.Second) print := func(name string) { fmt.Printf("%s has: %d\n", name, b.Query(name)) } nameList := []string{“xiaoming”, “xiaogang”, “xiaohong”, “xiaozhang”} for _, name := range nameList { go print(name) } time.Sleep(time.Second)}结果,可能不一样,因为协程都是并发执行的,执行顺序不固定:➜ sync_pkg git:(master) ✗ go run rwmutex.goxiaohong has: 0xiaozhang has: 0xiaogang has: 2000xiaoming has: 100等待组互斥锁和读写锁大多数人可能比较熟悉,而对等待组(WaitGroup)可能就不那么熟悉,甚至有点陌生,所以先来介绍下等待组在现实中的例子。你们团队有5个人,你作为队长要带领大家打开藏有宝藏的箱子,但这个箱子需要4把钥匙才能同时打开,你把寻找4把钥匙的任务,分配给4个队员,让他们分别去寻找,而你则守着宝箱,在这等待,等他们都找到回来后,一起插进钥匙打开宝箱。这其中有个很重要的过程叫等待:等待一些工作完成后,再进行下一步的工作。如果使用Golang实现,就得使用等待组。等待组是WaitGroup,它有3个函数:Add():在被等待的协程启动前加1,代表要等待1个协程。Done():被等待的协程执行Done,代表该协程已经完成任务,通知等待协程。Wait(): 等待其他协程的协程,使用Wait进行等待。type WaitGroupfunc (wg *WaitGroup) Add(delta int){}func (wg *WaitGroup) Done(){}func (wg *WaitGroup) Wait(){}来,一起看下怎么用WaitGroup实现上面的问题。队长先创建一个WaitGroup对象wg,每个队员都是1个协程, 队长让队员出发前,使用wg.Add(),队员出发寻找钥匙,队长使用wg.Wait()等待(阻塞)所有队员完成,某个队员完成时执行wg.Done(),等所有队员找到钥匙,wg.Wait()则返回,完成了等待的过程,接下来就是开箱。结合之前的协程池的例子,修改成WG等待协程池协程退出,实例代码:func leader() { var wg sync.WaitGroup wg.Add(4) for i := 0; i < 4; i++ { go follower(&wg, i) } wg.Wait() fmt.Println(“open the box together”)}func follower(wg *sync.WaitGroup, id int) { fmt.Printf(“follwer %d find key\n”, id) wg.Done()}结果:➜ sync_pkg git:(master) ✗ go run waitgroup.gofollwer 3 find keyfollwer 1 find keyfollwer 0 find keyfollwer 2 find keyopen the box togetherWaitGroup也常用在协程池的处理上,协程池等待所有协程退出,把上篇文章《Golang并发模型:轻松入门协程池》的例子改下:package mainimport ( “fmt” “sync”)func main() { var once sync.Once onceBody := func() { fmt.Println(“Only once”) } done := make(chan bool) for i := 0; i < 10; i++ { go func() { once.Do(onceBody) done <- true }() } for i := 0; i < 10; i++ { <-done }}单次执行在程序执行前,通常需要做一些初始化操作,但触发初始化操作的地方是有多处的,但是这个初始化又只能执行1次,怎么办呢?使用Once就能轻松解决,once对象是用来存放1个无入参无返回值的函数,once可以确保这个函数只被执行1次。type Oncefunc (o *Once) Do(f func()){}直接把官方代码给大家搬过来看下,once在10个协程中调用,但once中的函数onceBody()只执行了1次:package mainimport ( “fmt” “sync”)func main() { var once sync.Once onceBody := func() { fmt.Println(“Only once”) } done := make(chan bool) for i := 0; i < 10; i++ { go func() { once.Do(onceBody) done <- true }() } for i := 0; i < 10; i++ { <-done }}结果:➜ sync_pkg git:(master) ✗ go run once.goOnly once示例源码本文所有示例源码,及历史文章、代码都存储在Github:https://github.com/Shitaibin/golang_step_by_step/tree/master/sync_pkg下期预告这次先介绍入门的知识,下次再介绍一些深入思考、最佳实践,不能一口吃个胖子,咱们慢慢来,顺序渐进。下一篇我以这些主题进行介绍,欢迎关注:哪个协程先获取锁一定要用锁吗锁与通道的选择文章推荐Golang并发模型:轻松入门流水线模型Golang并发模型:轻松入门流水线FAN模式Golang并发模型:并发协程的优雅退出Golang并发模型:轻松入门selectGolang并发模型:select进阶Golang并发模型:轻松入门协程池Golang并发的次优选择:sync包如果这篇文章对你有帮助,请点个赞/喜欢,感谢。本文作者:大彬如果喜欢本文,随意转载,但请保留此原文链接:http://lessisbetter.site/2019/01/04/golang-pkg-sync/ ...

January 5, 2019 · 3 min · jiezi

Golang并发:一招掌握无阻塞通道读写

介绍Golang并发的模型写了几篇了,但一直没有以channel为主题进行介绍,今天就给大家聊一聊channel,channel的基本使用非常简单,想必大家都已了解,所以直接来个进阶点的:介绍channel的阻塞情况,以及给你一个必杀技,立马解决阻塞问题,实用性高。阻塞场景无论是有缓存通道、无缓冲通道都存在阻塞的情况。阻塞场景共4个,有缓存和无缓冲各2个。无缓冲通道的特点是,发送的数据需要被读取后,发送才会完成,它阻塞场景:通道中无数据,但执行读通道。通道中无数据,向通道写数据,但无协程读取。// 场景1func ReadNoDataFromNoBufCh() { noBufCh := make(chan int) <-noBufCh fmt.Println(“read from no buffer channel success”) // Output: // fatal error: all goroutines are asleep - deadlock!}// 场景2func WriteNoBufCh() { ch := make(chan int) ch <- 1 fmt.Println(“write success no block”) // Output: // fatal error: all goroutines are asleep - deadlock!}注:示例代码中的Output注释代表函数的执行结果,每一个函数都由于阻塞在通道操作而无法继续向下执行,最后报了死锁错误。有缓存通道的特点是,有缓存时可以向通道中写入数据后直接返回,缓存中有数据时可以从通道中读到数据直接返回,这时有缓存通道是不会阻塞的,它阻塞场景是:通道的缓存无数据,但执行读通道。通道的缓存已经占满,向通道写数据,但无协程读。// 场景1func ReadNoDataFromBufCh() { bufCh := make(chan int, 1) <-bufCh fmt.Println(“read from no buffer channel success”) // Output: // fatal error: all goroutines are asleep - deadlock!}// 场景2func WriteBufChButFull() { ch := make(chan int, 1) // make ch full ch <- 100 ch <- 1 fmt.Println(“write success no block”) // Output: // fatal error: all goroutines are asleep - deadlock!}使用Select实现无阻塞读写select是执行选择操作的一个结构,它里面有一组case语句,它会执行其中无阻塞的那一个,如果都阻塞了,那就等待其中一个不阻塞,进而继续执行,它有一个default语句,该语句是永远不会阻塞的,我们可以借助它实现无阻塞的操作。如果不了解,不想多了解一下select可以先看下这2篇文章:Golang并发模型:轻松入门selectGolang并发模型:select进阶下面示例代码是使用select修改后的无缓冲通道和有缓冲通道的读写,以下函数可以直接通过main函数调用,其中的Ouput的注释是运行结果,从结果能看出,在通道不可读或者不可写的时候,不再阻塞等待,而是直接返回。// 无缓冲通道读func ReadNoDataFromNoBufChWithSelect() { bufCh := make(chan int) if v, err := ReadWithSelect(bufCh); err != nil { fmt.Println(err) } else { fmt.Printf(“read: %d\n”, v) } // Output: // channel has no data}// 有缓冲通道读func ReadNoDataFromBufChWithSelect() { bufCh := make(chan int, 1) if v, err := ReadWithSelect(bufCh); err != nil { fmt.Println(err) } else { fmt.Printf(“read: %d\n”, v) } // Output: // channel has no data}// select结构实现通道读func ReadWithSelect(ch chan int) (x int, err error) { select { case x = <-ch: return x, nil default: return 0, errors.New(“channel has no data”) }}// 无缓冲通道写func WriteNoBufChWithSelect() { ch := make(chan int) if err := WriteChWithSelect(ch); err != nil { fmt.Println(err) } else { fmt.Println(“write success”) } // Output: // channel blocked, can not write}// 有缓冲通道写func WriteBufChButFullWithSelect() { ch := make(chan int, 1) // make ch full ch <- 100 if err := WriteChWithSelect(ch); err != nil { fmt.Println(err) } else { fmt.Println(“write success”) } // Output: // channel blocked, can not write}// select结构实现通道写func WriteChWithSelect(ch chan int) error { select { case ch <- 1: return nil default: return errors.New(“channel blocked, can not write”) }}使用Select+超时改善无阻塞读写使用default实现的无阻塞通道阻塞有一个缺陷:当通道不可读或写的时候,会即可返回。实际场景,更多的需求是,我们希望尝试读一会数据,或者尝试写一会数据,如果实在没法读写再返回,程序继续做其它的事情。使用定时器替代default可以解决这个问题,给通道增加读写数据的容忍时间,如果500ms内无法读写,就即刻返回。示例代码修改一下会是这样:func ReadWithSelect(ch chan int) (x int, err error) { timeout := time.NewTimer(time.Microsecond * 500) select { case x = <-ch: return x, nil case <-timeout.C: return 0, errors.New(“read time out”) }}func WriteChWithSelect(ch chan int) error { timeout := time.NewTimer(time.Microsecond * 500) select { case ch <- 1: return nil case <-timeout.C: return errors.New(“write time out”) }}结果就会变成超时返回:read time outwrite time outread time outwrite time out示例源码本文所有示例源码,及历史文章、代码都存储在Github:https://github.com/Shitaibin/…这篇文章了channel的阻塞情况,以及解决阻塞的2种办法:使用select的default语句,在channel不可读写时,即可返回使用select+定时器,在超时时间内,channel不可读写,则返回希望这篇文章对你的channel读写有所启发。如果这篇文章对你有帮助,请点个赞/喜欢,感谢。本文作者:大彬如果喜欢本文,随意转载,但请保留此原文链接:http://lessisbetter.site/2018/11/03/Golang-channel-read-and-write-without-blocking/ ...

December 27, 2018 · 2 min · jiezi

Netty中的Channel之数据冲刷与线程安全(writeAndFlush)

本文首发个人博客:猫叔的博客 | MySelfGitHub项目地址InChat一个轻量级、高效率的支持多端(应用与硬件Iot)的异步网络应用通讯框架前言本文预设读者已经了解了一定的Netty基础知识,并能够自己构建一个Netty的通信服务(包括客户端与服务端)。那么你一定使用到了Channel,这是Netty对传统JavaIO、NIO的链接封装实例。那么接下来让我们来了解一下关于Channel的数据冲刷与线程安全吧。数据冲刷的步骤1、获取一个链接实例@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取链接实例 Channel channel = ctx.channel();}我将案例放在初学者最熟悉的channelRead方法中,这是一个数据接收的方法,我们自实现Netty的消息处理接口时需要重写的方法。即客户端发送消息后,这个方法会被触发调用,所以我们在这个方法中进行本次内容的讲解。由上一段代码,其实目前还是很简单,我们借助ChannelHandlerContext(这是一个ChannelHandler与ChannelPipeline相交互并对接的一个对象。如下是源码的解释)来获取目前的链接实例Channel。/* Enables a {@link ChannelHandler} to interact with its {@link ChannelPipeline} * and other handlers. Among other things a handler can notify the next {@link ChannelHandler} in the * {@link ChannelPipeline} as well as modify the {@link ChannelPipeline} it belongs to dynamically. / public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker { //…… }2、创建一个持有数据的ByteBuf@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取链接实例 Channel channel = ctx.channel(); //创建一个持有数据的ByteBuf ByteBuf buf = Unpooled.copiedBuffer(“data”, CharsetUtil.UTF_8);}ByteBuf又是什么呢?它是Netty框架自己封装的一个字符底层对象,是一个对 byte[] 和 ByteBuffer NIO 的抽象类,更官网的说就是“零个或多个字节的随机和顺序可访问的序列。”,如下是源码的解释/* * A random and sequential accessible sequence of zero or more bytes (octets). * This interface provides an abstract view for one or more primitive byte * arrays ({@code byte[]}) and {@linkplain ByteBuffer NIO buffers}. / public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf> { //…… }由上一段源码可以看出,ByteBuf是一个抽象类,所以我们不能通过 new 的形式来创建一个新的ByteBuf对象。那么我们可以通过Netty提供的一个 final 的工具类 Unpooled(你将其看作是一个创建ByteBuf的工具类就好了)。/* * Creates a new {@link ByteBuf} by allocating new space or by wrapping * or copying existing byte arrays, byte buffers and a string. / public final class Unpooled { //…… }这真是一个有趣的过程,那么接下来我们仅需要再看看 copiedBuffer 这个方法了。这个方法相对简单,就是我们将创建一个新的缓冲区,其内容是我们指定的 UTF-8字符集 编码指定的 “data” ,同时这个新的缓冲区的读索引和写索引分别是0和字符串的长度。3、冲刷数据@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取链接实例 Channel channel = ctx.channel(); //创建一个持有数据的ByteBuf ByteBuf buf = Unpooled.copiedBuffer(“data”, CharsetUtil.UTF_8); //数据冲刷 channel.writeAndFlush(buf);}我相信大部分人都是直接这么写的,因为我们经常理所当然的启动测试,并在客户端接受到了这个 “data” 消息。那么我们是否应该注意一下,这个数据冲刷会返回一个什么值,我们要如何才能在服务端知道,这次数据冲刷是成功还是失败呢?那么其实Netty框架已经考虑到了这个点,本次数据冲刷我们将得到一个 ChannelFuture 。@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取链接实例 Channel channel = ctx.channel(); //创建一个持有数据的ByteBuf ByteBuf buf = Unpooled.copiedBuffer(“data”, CharsetUtil.UTF_8); //数据冲刷 ChannelFuture cf = channel.writeAndFlush(buf);}是的,他就是 Channel 异步IO操作的结果,它是一个接口,并继承了Future<V>。(如下为源码的解释)/* * The result of an asynchronous {@link Channel} I/O operation. / public interface ChannelFuture extends Future<Void> { //…… }既然如此,那么我们可以明显的知道我们可以对其添加对应的监听。4、异步回调结果监听@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取链接实例 Channel channel = ctx.channel(); //创建一个持有数据的ByteBuf ByteBuf buf = Unpooled.copiedBuffer(“data”, CharsetUtil.UTF_8); //数据冲刷 ChannelFuture cf = channel.writeAndFlush(buf); //添加ChannelFutureListener以便在写操作完成后接收通知 cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { //写操作完成,并没有错误发生 if (future.isSuccess()){ System.out.println(“successful”); }else{ //记录错误 System.out.println(“error”); future.cause().printStackTrace(); } } });}好的,我们可以简单的从代码理解到,我们将通过对异步IO的结果监听,得到本次运行的结果。我想这才是一个相对完整的 数据冲刷(writeAndFlush)。测试线程安全的流程对于线程安全的测试,我们将模拟多个线程去执行数据冲刷操作,我们可以用到 Executor 。我们可以这样理解 Executor ,是一种省略了线程启用与调度的方式,你只需要传递一个 Runnable 给它即可,你不再需要去 start 一个线程。(如下是源码的解释)/* * An object that executes submitted {@link Runnable} tasks. This * interface provides a way of decoupling task submission from the * mechanics of how each task will be run, including details of thread * use, scheduling, etc. An {@code Executor} is normally used * instead of explicitly creating threads. For example, rather than * invoking {@code new Thread(new(RunnableTask())).start()} for each * of a set of tasks, you might use:… / public interface Executor { //…… }那么我们的测试代码,大致是这样的。final Channel channel = ctx.channel();//创建要写数据的ByteBuffinal ByteBuf buf = Unpooled.copiedBuffer(“data”,CharsetUtil.UTF_8).retain();//创建将数据写到Channel的RunnableRunnable writer = new Runnable() { @Override public void run() { ChannelFuture cf = channel.writeAndFlush(buf.duplicate()); cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { //写操作完成,并没有错误发生 if (future.isSuccess()){ System.out.println(“successful”); }else{ //记录错误 System.out.println(“error”); future.cause().printStackTrace(); } } }); }};//获取到线程池的Executor的引用Executor executor = Executors.newCachedThreadPool();//提交到某个线程中执行executor.execute(writer);//提交到另一个线程中执行executor.execute(writer);这里,我们需要注意的是:创建 ByteBuf 的时候,我们使用了 retain 这个方法,他是将我们生成的这个 ByteBuf 进行保留操作。在 ByteBuf 中有这样的一种区域: 非保留和保留派生缓冲区。这里有点复杂,我们可以简单的理解,如果调用了 retain 那么数据就存在派生缓冲区中,如果没有调用,则会在调用后,移除这一个字符数据。(如下是 ByteBuf 源码的解释)/<h4>Non-retained and retained derived buffers</h4> * * Note that the {@link #duplicate()}, {@link #slice()}, {@link #slice(int, int)} and {@link #readSlice(int)} does NOT * call {@link #retain()} on the returned derived buffer, and thus its reference count will NOT be increased. If you * need to create a derived buffer with increased reference count, consider using {@link #retainedDuplicate()}, * {@link #retainedSlice()}, {@link #retainedSlice(int, int)} and {@link #readRetainedSlice(int)} which may return * a buffer implementation that produces less garbage. */好的,我想你可以自己动手去测试一下,最好再看看源码,加深一下实现的原理印象。这里的线程池并不是现实线程安全,而是用来做测试多线程的,Netty的Channel实现是线程安全的,所以我们可以存储一个到Channel的引用,并且每当我们需要向远程节点写数据时,都可以使用它,即使当时许多线程都在使用它,消息也会被保证按顺序发送的。结语最后,介绍一下,个人的一个基于Netty的开源项目:InChat一个轻量级、高效率的支持多端(应用与硬件Iot)的异步网络应用通讯框架参考资料: 《Netty实战》 ...

December 24, 2018 · 3 min · jiezi

Golang并发模型:并发协程的优雅退出

goroutine作为Golang并发的核心,我们不仅要关注它们的创建和管理,当然还要关注如何合理的退出这些协程,不(合理)退出不然可能会造成阻塞、panic、程序行为异常、数据结果不正确等问题。这篇文章介绍,如何合理的退出goroutine,减少软件bug。goroutine在退出方面,不像线程和进程,不能通过某种手段强制关闭它们,只能等待goroutine主动退出。但也无需为退出、关闭goroutine而烦恼,下面就介绍3种优雅退出goroutine的方法,只要采用这种最佳实践去设计,基本上就可以确保goroutine退出上不会有问题,尽情享用。1:使用for-range退出for-range是使用频率很高的结构,常用它来遍历数据,range能够感知channel的关闭,当channel被发送数据的协程关闭时,range就会结束,接着退出for循环。它在并发中的使用场景是:当协程只从1个channel读取数据,然后进行处理,处理后协程退出。下面这个示例程序,当in通道被关闭时,协程可自动退出。go func(in <-chan int) { // Using for-range to exit goroutine // range has the ability to detect the close/end of a channel for x := range in { fmt.Printf(“Process %d\n”, x) }}(inCh)2:使用,ok退出for-select也是使用频率很高的结构,select提供了多路复用的能力,所以for-select可以让函数具有持续多路处理多个channel的能力。但select没有感知channel的关闭,这引出了2个问题:继续在关闭的通道上读,会读到通道传输数据类型的零值,如果是指针类型,读到nil,继续处理还会产生nil。继续在关闭的通道上写,将会panic。问题2可以这样解决,通道只由发送方关闭,接收方不可关闭,即某个写通道只由使用该select的协程关闭,select中就不存在继续在关闭的通道上写数据的问题。问题1可以使用,ok来检测通道的关闭,使用情况有2种。第一种:如果某个通道关闭后,需要退出协程,直接return即可。示例代码中,该协程需要从in通道读数据,还需要定时打印已经处理的数量,有2件事要做,所有不能使用for-range,需要使用for-select,当in关闭时,ok=false,我们直接返回。go func() { // in for-select using ok to exit goroutine for { select { case x, ok := <-in: if !ok { return } fmt.Printf(“Process %d\n”, x) processedCnt++ case <-t.C: fmt.Printf(“Working, processedCnt = %d\n”, processedCnt) } }}()第二种:如果某个通道关闭了,不再处理该通道,而是继续处理其他case,退出是等待所有的可读通道关闭。我们需要使用select的一个特征:select不会在nil的通道上进行等待。这种情况,把只读通道设置为nil即可解决。go func() { // in for-select using ok to exit goroutine for { select { case x, ok := <-in1: if !ok { in1 = nil } // Process case y, ok := <-in2: if !ok { in2 = nil } // Process case <-t.C: fmt.Printf(“Working, processedCnt = %d\n”, processedCnt) } // If both in channel are closed, goroutine exit if in1 == nil && in2 == nil { return } }}()3:使用退出通道退出使用,ok来退出使用for-select协程,解决是当读入数据的通道关闭时,没数据读时程序的正常结束。想想下面这2种场景,,ok还能适用吗?接收的协程要退出了,如果它直接退出,不告知发送协程,发送协程将阻塞。启动了一个工作协程处理数据,如何通知它退出?使用一个专门的通道,发送退出的信号,可以解决这类问题。以第2个场景为例,协程入参包含一个停止通道stopCh,当stopCh被关闭,case <-stopCh会执行,直接返回即可。当我启动了100个worker时,只要main()执行关闭stopCh,每一个worker都会都到信号,进而关闭。如果main()向stopCh发送100个数据,这种就低效了。func worker(stopCh <-chan struct{}) { go func() { defer fmt.Println(“worker exit”) // Using stop channel explicit exit for { select { case <-stopCh: fmt.Println(“Recv stop signal”) return case <-t.C: fmt.Println(“Working .”) } } }() return}最佳实践回顾发送协程主动关闭通道,接收协程不关闭通道。技巧:把接收方的通道入参声明为只读,如果接收协程关闭只读协程,编译时就会报错。协程处理1个通道,并且是读时,协程优先使用for-range,因为range可以关闭通道的关闭自动退出协程。,ok可以处理多个读通道关闭,需要关闭当前使用for-select的协程。显式关闭通道stopCh可以处理主动通知协程退出的场景。完整示例代码本文所有代码都在仓库,可查看完整示例代码:https://github.com/Shitaibin/…并发系列文章推荐Golang并发模型:轻松入门流水线模型Golang并发模型:轻松入门流水线FAN模式Golang并发模型:并发协程的优雅退出如果这篇文章对你有帮助,请点个赞/喜欢,鼓励我持续分享,感谢。如果喜欢本文,随意转载,但请保留此原文链接。 ...

December 4, 2018 · 1 min · jiezi

Golang并发模型:轻松入门流水线FAN模式

前一篇文章《Golang并发模型:轻松入门流水线模型》,介绍了流水线模型的概念,这篇文章是流水线模型进阶,介绍FAN-IN和FAN-OUT,FAN模式可以让我们的流水线模型更好的利用Golang并发,提高软件性能。但FAN模式不一定是万能,不见得能提高程序的性能,甚至还不如普通的流水线。我们先介绍下FAN模式,再看看它怎么提升性能的,它是不是万能的。这篇文章内容略多,本来打算分几次写的,但不如一次读完爽,所以干脆还是放一篇文章了,要是时间不充足,利用好碎片时间,可以每次看1个标题的内容。FAN-IN和FAN-OUT模式Golang的并发模式灵感来自现实世界,这些模式是通用的,毫无例外,FAN模式也是对当前世界的模仿。以汽车组装为例,汽车生产线上有个阶段是给小汽车装4个轮子,可以把这个阶段任务交给4个人同时去做,这4个人把轮子都装完后,再把汽车移动到生产线下一个阶段。这个过程中,就有任务的分发,和任务结果的收集。其中任务分发是FAN-OUT,任务收集是FAN-IN。FAN-OUT模式:多个goroutine从同一个通道读取数据,直到该通道关闭。OUT是一种张开的模式,所以又被称为扇出,可以用来分发任务。FAN-IN模式:1个goroutine从多个通道读取数据,直到这些通道关闭。IN是一种收敛的模式,所以又被称为扇入,用来收集处理的结果。FAN-IN和FAN-OUT实践我们这次试用FAN-OUT和FAN-IN,解决《Golang并发模型:轻松入门流水线模型》中提到的问题:计算一个整数切片中元素的平方值并把它打印出来。producer()保持不变,负责生产数据。squre()也不变,负责计算平方值。修改main(),启动3个square,这3个squre从producer生成的通道读数据,这是FAN-OUT。增加merge(),入参是3个square各自写数据的通道,给这3个通道分别启动1个协程,把数据写入到自己创建的通道,并返回该通道,这是FAN-IN。FAN模式流水线示例:package mainimport ( “fmt” “sync”)func producer(nums …int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { out <- i } }() return out}func square(inCh <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range inCh { out <- n * n } }() return out}func merge(cs …<-chan int) <-chan int { out := make(chan int) var wg sync.WaitGroup collect := func(in <-chan int) { defer wg.Done() for n := range in { out <- n } } wg.Add(len(cs)) // FAN-IN for _, c := range cs { go collect(c) } // 错误方式:直接等待是bug,死锁,因为merge写了out,main却没有读 // wg.Wait() // close(out) // 正确方式 go func() { wg.Wait() close(out) }() return out}func main() { in := producer(1, 2, 3, 4) // FAN-OUT c1 := square(in) c2 := square(in) c3 := square(in) // consumer for ret := range merge(c1, c2, c3) { fmt.Printf("%3d “, ret) } fmt.Println()}3个squre协程并发运行,结果顺序是无法确定的,所以你得到的结果,不一定与下面的相同。➜ awesome git:(master) ✗ go run hi.go 1 4 16 9 FAN模式真能提升性能吗?相信你心里已经有了答案,可以的。我们还是使用老问题,对比一下简单的流水线和FAN模式的流水线,修改下代码,增加程序的执行时间:produer()使用参数生成指定数量的数据。square()增加阻塞操作,睡眠1s,模拟阶段的运行时间。main()关闭对结果数据的打印,降低结果处理时的IO对FAN模式的对比。普通流水线:// hi_simple.gopackage mainimport ( “fmt”)func producer(n int) <-chan int { out := make(chan int) go func() { defer close(out) for i := 0; i < n; i++ { out <- i } }() return out}func square(inCh <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range inCh { out <- n * n // simulate time.Sleep(time.Second) } }() return out}func main() { in := producer(10) ch := square(in) // consumer for _ = range ch { }}使用FAN模式的流水线:// hi_fan.gopackage mainimport ( “sync” “time”)func producer(n int) <-chan int { out := make(chan int) go func() { defer close(out) for i := 0; i < n; i++ { out <- i } }() return out}func square(inCh <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range inCh { out <- n * n // simulate time.Sleep(time.Second) } }() return out}func merge(cs …<-chan int) <-chan int { out := make(chan int) var wg sync.WaitGroup collect := func(in <-chan int) { defer wg.Done() for n := range in { out <- n } } wg.Add(len(cs)) // FAN-IN for _, c := range cs { go collect(c) } // 错误方式:直接等待是bug,死锁,因为merge写了out,main却没有读 // wg.Wait() // close(out) // 正确方式 go func() { wg.Wait() close(out) }() return out}func main() { in := producer(10) // FAN-OUT c1 := square(in) c2 := square(in) c3 := square(in) // consumer for _ = range merge(c1, c2, c3) { }}多次测试,每次结果近似,结果如下:FAN模式利用了7%的CPU,而普通流水线CPU只使用了3%,FAN模式能够更好的利用CPU,提供更好的并发,提高Golang程序的并发性能。FAN模式耗时10s,普通流水线耗时4s。在协程比较费时时,FAN模式可以减少程序运行时间,同样的时间,可以处理更多的数据。➜ awesome git:(master) ✗ time go run hi_simple.gogo run hi_simple.go 0.17s user 0.18s system 3% cpu 10.389 total➜ awesome git:(master) ✗ ➜ awesome git:(master) ✗ time go run hi_fan.gogo run hi_fan.go 0.17s user 0.16s system 7% cpu 4.288 total也可以使用Benchmark进行测试,看2个类型的执行时间,结论相同。为了节约篇幅,这里不再介绍,方法和结果贴在Gist了,想看的朋友瞄一眼,或自己动手搞搞。FAN模式一定能提升性能吗?FAN模式可以提高并发的性能,那我们是不是可以都使用FAN模式?不行的,因为FAN模式不一定能提升性能。依然使用之前的问题,再次修改下代码,其他不变:squre()去掉耗时。main()增加producer()的入参,让producer生产10,000,000个数据。简单版流水线修改代码:// hi_simple.gofunc square(inCh <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range inCh { out <- n * n } }() return out}func main() { in := producer(10000000) ch := square(in) // consumer for _ = range ch { }}FAN模式流水线修改代码:// hi_fan.gopackage mainimport ( “sync”)func square(inCh <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range inCh { out <- n * n } }() return out}func main() { in := producer(10000000) // FAN-OUT c1 := square(in) c2 := square(in) c3 := square(in) // consumer for _ = range merge(c1, c2, c3) { }}结果,可以跑多次,结果近似:➜ awesome git:(master) ✗ time go run hi_simple.go go run hi_simple.go 9.96s user 5.93s system 168% cpu 9.424 total➜ awesome git:(master) ✗ time go run hi_fan.go go run hi_fan.go 23.35s user 11.51s system 297% cpu 11.737 total从这个结果,我们能看到2点。FAN模式可以提高CPU利用率。FAN模式不一定能提升效率,降低程序运行时间。优化FAN模式既然FAN模式不一定能提高性能,如何优化?不同的场景优化不同,要依具体的情况,解决程序的瓶颈。我们当前程序的瓶颈在FAN-IN,squre函数很快就完成,merge函数它把3个数据写入到1个通道的时候出现了瓶颈,适当使用带缓冲通道可以提高程序性能,再修改下代码merge()中的out修改为:out := make(chan int, 100)结果:➜ awesome git:(master) ✗ time go run hi_fan_buffered.go go run hi_fan_buffered.go 19.85s user 8.19s system 323% cpu 8.658 total使用带缓存通道后,程序的性能有了较大提升,CPU利用率提高到323%,提升了8%,运行时间从11.7降低到8.6,降低了26%。FAN模式的特点很简单,相信你已经掌握了,如果记不清了看这里,本文所有代码在该Github仓库。FAN模式很有意思,并且能提高Golang并发的性能,如果想以后运用自如,用到自己的项目中去,还是要写写自己的Demo,快去实践一把。下一篇,写流水线中协程的“优雅退出”,欢迎关注。如果这篇文章对你有帮助,请点个赞/喜欢,让我知道我的写作是有价值的,感谢。 ...

November 28, 2018 · 3 min · jiezi

Golang并发模型:轻松入门流水线模型

Golang作为一个实用主义的编程语言,非常注重性能,在语言特性上天然支持并发,它有多种并发模型,通过流水线模型系列文章,你会更好的使用Golang并发特性,提高你的程序性能。这篇文章主要介绍流水线模型的流水线概念,后面文章介绍流水线模型的FAN-IN和FAN-OUT,最后介绍下如何合理的关闭流水线的协程。Golang的并发核心思路Golang并发核心思路是关注数据流动。数据流动的过程交给channel,数据处理的每个环节都交给goroutine,把这些流程画起来,有始有终形成一条线,那就能构成流水线模型。但我们先从简单的入手。从一个简单的流水线入手流水线并不是什么新奇的概念,它能极大的提高生产效率,在当代社会流水线非常普遍,我们用的几乎任何产品(手机、电脑、汽车、水杯),都是从流水线上生产出来的。以汽车为例,整个汽车流水线要经过几百个组装点,而在某个组装点只组装固定的零部件,然后传递给下一个组装点,最终一台完整的汽车从流水线上生产出来。Golang的并发模型灵感其实都来自我们生活,对软件而言,高的生产效率就是高的性能。在Golang中,流水线由多个阶段组成,每个阶段之间通过channel连接,每个节点可以由多个同时运行的goroutine组成。从最简单的流水线入手。下图的流水线由3个阶段组成,分别是A、B、C,A和B之间是通道aCh,B和C之间是通道bCh,A生成数据传递给B,B生成数据传递给C。流水线中,第一个阶段的协程是生产者,它们只生产数据。最后一个阶段的协程是消费者,它们只消费数据。下图中A是生成者,C是消费者,而B只是中间过程的处理者。举个例子,设计一个程序:计算一个整数切片中元素的平方值并把它打印出来。非并发的方式是使用for遍历整个切片,然后计算平方,打印结果。我们使用流水线模型实现这个简单的功能,从流水线的角度,可以分为3个阶段:遍历切片,这是生产者。计算平方值。打印结果,这是消费者。下面这段代码:producer()负责生产数据,它会把数据写入通道,并把它写数据的通道返回。square()负责从某个通道读数字,然后计算平方,将结果写入通道,并把它的输出通道返回。main()负责启动producer和square,并且还是消费者,读取suqre的结果,并打印出来。package mainimport ( “fmt”)func producer(nums …int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { out <- n } }() return out}func square(inCh <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range inCh { out <- n * n } }() return out}func main() { in := producer(1, 2, 3, 4) ch := square(in) // consumer for ret := range ch { fmt.Printf("%3d", ret) } fmt.Println()}结果:➜ awesome git:(master) ✗ go run hi.go 1 4 9 16这是一种原始的流水线模型,这种原始能让我们掌握流水线的思路。流水线的特点每个阶段把数据通过channel传递给下一个阶段。每个阶段要创建1个goroutine和1个通道,这个goroutine向里面写数据,函数要返回这个通道。有1个函数来组织流水线,我们例子中是main函数。如果你没了解过流水线,建议自己把以上的程序写一遍,如果遇到问题解决了,那才真正掌握了流水线模型的思路。下一篇,我将介绍流水线模型的FAN-IN、FAN-OUT,欢迎关注。如果这篇文章对你有帮助,请点个赞/喜欢,让我知道我的写作是有价值的,感谢。 ...

November 26, 2018 · 1 min · jiezi