前言

这周姐姐入职了新公司,老板想探探他的底,看了一眼他的简历,呦呵,精通kafka,这小姑娘有两下子,既然这样,那你写一个音讯队列吧。因为要用go语言写,这可给姐姐愁坏了。连忙来求助我,我这么坚贞不屈一人,在姐姐的软磨硬泡下还是许可他了,所以接下来我就手把手教姐姐怎么写一个音讯队列。上面咱们就来看一看我是怎么写的吧~~~。

本代码已上传到我的github:

有须要的小伙伴,可自行下载,顺便给个小星星吧~~~

什么是音讯队列

姐姐真是把我愁坏了,本人写的精通kafka,居然不晓得什么是音讯队列,于是,一贯好脾气的我开始给姐姐讲一讲什么是音讯队列。

音讯队列,咱们个别称它为MQ(Message Queue),两个单词的联合,这两个英文单词想必大家都应该晓得吧,其实最相熟的还是Queue吧,即队列。队列是一种先进先出的数据结构,队列的应用还是比拟广泛的,然而曾经有队列了,怎么还须要MQ呢?

我:问你呢,姐姐,晓得吗?为什么还须要MQ

姐姐:快点讲,想挨打呀?

我:噗。。。 算我多嘴,哼~~~

欠欠的我开始了接下来的急躁解说......

举一个简略的例子,假如当初咱们要做一个零碎,该登陆零碎须要在用户登陆胜利后,发送封邮件到用户邮箱进行揭示,需要还是很简略的,咱们先开看一看没有MQ,咱们该怎么实现呢?画一个时序图来看一看:

看这个图,邮件发送在申请登陆时进行,当明码验证胜利后,就发送邮件,而后返回登陆胜利。这样是能够的,然而他是有缺点的。这让咱们的登陆操作变得复杂了,每次申请登陆都须要进行邮件发送,如果这里呈现谬误,整个登陆申请也呈现了谬误,导致登陆不胜利;还有一个问题,原本咱们登陆申请调用接口仅仅须要100ms,因为两头要做一次发送邮件的期待,那么调用一次登陆接口的工夫就要增长,这就是问题所在,一封邮件他的优先级 不是很高的,用户也不须要实时收到这封邮件,所以这时,就体现了音讯队列的重要性了,咱们用音讯队列进行改良一下。

这里咱们将发送邮件申请放到Mq中,这样咱们就能进步用户体验的吞吐量,这个很重要,顾客就是上帝嘛,毕竟也没有人喜爱用一个很慢很慢的app。

这里只是举了MQ泛滥利用中的其中一个,即异步利用,MQ还在零碎解藕、削峰/限流中有着重要利用,这两个我就不具体解说了,原理都一样,好好思考一下,你们都能懂得。

channel

好啦,姐姐终于晓得什么是音讯队列了,然而当初还是没法进行音讯队列开发的,因为还差一个知识点,即go语言中的channel。这个很重要,咱们还须要靠这个来开发咱们的音讯队列呢。

因篇幅无限,这里不具体介绍channel,只介绍根本应用办法。

什么是channel

Goroutine 和 Channel 是 Go 语言并发编程的两大基石。Goroutine 用于执行并发工作,Channel 用于 goroutine 之间的同步、通信。Go提倡应用通信的办法代替共享内存,当一个Goroutine须要和其余Goroutine资源共享时,Channel就会在他们之间架起一座桥梁,并提供确保安全同步的机制。channel实质上其实还是一个队列,遵循FIFO准则。具体规定如下:

  • 先从 Channel 读取数据的 Goroutine 会先接管到数据;
  • 先向 Channel 发送数据的 Goroutine 会失去先发送数据的权力;

创立通道

创立通道须要用到关键字 make ,格局如下:

通道实例 := make(chan 数据类型)
  • 数据类型:通道内传输的元素类型。
  • 通道实例:通过make创立的通道句柄。

无缓冲通道的应用

Go语言中无缓冲的通道(unbuffered channel)是指在接管前没有能力保留任何值的通道。这种类型的通道要求发送 goroutine 和接管 goroutine 同时筹备好,能力实现发送和接管操作。

无缓冲通道的定义形式如下:

通道实例 := make(chan 通道类型)
  • 通道类型:和无缓冲通道用法统一,影响通道发送和接管的数据类型。
  • 缓冲大小:0
  • 通道实例:被创立出的通道实例。

写个例子来帮忙大家了解一下吧:

package mainimport (    "sync"    "time")func main() {    c := make(chan string)    var wg sync.WaitGroup    wg.Add(2)    go func() {        defer wg.Done()        c <- `Golang梦工厂`    }()    go func() {        defer wg.Done()        time.Sleep(time.Second * 1)        println(`Message: `+ <-c)    }()    wg.Wait()}

带缓冲的通道的应用

Go语言中有缓冲的通道(buffered channel)是一种在被接管前能存储一个或者多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时实现发送和接管。通道会阻塞发送和接管动作的条件也会不同。只有在通道中没有要接管的值时,接管动作才会阻塞。只有在通道没有可用缓冲区包容被发送的值时,发送动作才会阻塞。

有缓冲通道的定义形式如下:

通道实例 := make(chan 通道类型, 缓冲大小)
  • 通道类型:和无缓冲通道用法统一,影响通道发送和接管的数据类型。
  • 缓冲大小:决定通道最多能够保留的元素数量。
  • 通道实例:被创立出的通道实例。

来写一个例子解说一下:

package mainimport (    "sync"    "time")func main() {    c := make(chan string, 2)    var wg sync.WaitGroup    wg.Add(2)    go func() {        defer wg.Done()        c <- `Golang梦工厂`        c <- `asong`    }()    go func() {        defer wg.Done()        time.Sleep(time.Second * 1)        println(`公众号: `+ <-c)        println(`作者: `+ <-c)    }()    wg.Wait()}

好啦,通道的概念就介绍到这里了,如果须要,下一篇我出一个channel具体解说的文章。

音讯队列编码实现

筹备篇

终于开始进入主题了,姐姐都听的快要睡着了,我轰隆一嗓子,立马精力,然而呢,asong也是挨了一顿小电炮,代价惨痛呀,呜呜呜............

在开始编写代码编写间接,我须要构思咱们的整个代码架构,这才是正确的编码方式。咱们先来定义一个接口,把咱们须要实现的办法先列进去,前期对每一个代码进行实现就能够了。因而能够列出如下办法:

type Broker interface { publish(topic string, msg interface{}) error subscribe(topic string) (<-chan interface{}, error) unsubscribe(topic string, sub <-chan interface{}) error close() broadcast(msg interface{}, subscribers []chan interface{}) setConditions(capacity int)}
  • publish:进行音讯的推送,有两个参数即topicmsg,别离是订阅的主题、要传递的音讯
  • subscribe:音讯的订阅,传入订阅的主题,即可实现订阅,并返回对应的channel通道用来接收数据
  • unsubscribe:勾销订阅,传入订阅的主题和对应的通道
  • close:这个的作用就是很显著了,就是用来敞开音讯队列的
  • broadCast:这个属于外部办法,作用是进行播送,对推送的音讯进行播送,保障每一个订阅者都能够收到
  • setConditions:这里是用来设置条件,条件就是音讯队列的容量,这样咱们就能够管制音讯队列的大小了

仔细的你们有没有发现什么问题,这些代码我都定义的是外部办法,也就是包外不可用。为什么这么做呢,因为这里属于代理要做的事件,咱们还须要在封装一层,也就是客户端能间接调用的办法,这样才合乎软件架构。因而能够写出如下代码:

package mqtype Client struct { bro *BrokerImpl}func NewClient() *Client { return &Client{  bro: NewBroker(), }}func (c *Client)SetConditions(capacity int)  { c.bro.setConditions(capacity)}func (c *Client)Publish(topic string, msg interface{}) error{ return c.bro.publish(topic,msg)}func (c *Client)Subscribe(topic string) (<-chan interface{}, error){ return c.bro.subscribe(topic)}func (c *Client)Unsubscribe(topic string, sub <-chan interface{}) error { return c.bro.unsubscribe(topic,sub)}func (c *Client)Close()  {  c.bro.close()}func (c *Client)GetPayLoad(sub <-chan interface{})  interface{}{ for val:= range sub{  if val != nil{   return val  } } return nil}

下面只是准好了代码构造,然而音讯队列实现的构造咱们还没有设计,当初咱们就来设计一下。

type BrokerImpl struct { exit chan bool capacity int topics map[string][]chan interface{} // key: topic  value : queue sync.RWMutex // 同步锁}
  • exit:也是一个通道,这个用来做敞开音讯队列用的
  • capacity:即用来设置音讯队列的容量
  • topics:这里应用一个map构造,key即是topic,其值则是一个切片,chan类型,这里这么做的起因是咱们一个topic能够有多个订阅者,所以一个订阅者对应着一个通道
  • sync.RWMutex:读写锁,这里是为了避免并发状况下,数据的推送呈现谬误,所以采纳加锁的形式进行保障

好啦,当初咱们曾经筹备的很充沛啦,开始接下来办法填充之旅吧~~~

Publishbroadcast

这里两个合在一起讲的起因是braodcast是属于publish里的。这里的思路很简略,咱们只须要把传入的数据进行播送即可了,上面咱们来看代码实现:

func (b *BrokerImpl) publish(topic string, pub interface{}) error { select { case <-b.exit:  return errors.New("broker closed") default: } b.RLock() subscribers, ok := b.topics[topic] b.RUnlock() if !ok {  return nil } b.broadcast(pub, subscribers) return nil}func (b *BrokerImpl) broadcast(msg interface{}, subscribers []chan interface{}) { count := len(subscribers) concurrency := 1 switch { case count > 1000:  concurrency = 3 case count > 100:  concurrency = 2 default:  concurrency = 1 } pub := func(start int) {  for j := start; j < count; j += concurrency {   select {   case subscribers[j] <- msg:   case <-time.After(time.Millisecond * 5):   case <-b.exit:    return   }  } } for i := 0; i < concurrency; i++ {  go pub(i) }}

publish办法中没有什么好讲的,这里次要说一下broadcast的实现:

这里次要对数据进行播送,所以数据推送进来就能够了,没必要始终等着他推送胜利,所以这里咱们咱们采纳goroutine。在推送的时候,当推送失败时,咱们也不能始终期待呀,所以这里咱们加了一个超时机制,超过5毫秒就进行推送,接着进行上面的推送。

可能你们会有纳闷,下面怎么还有一个switch选项呀,干什么用的呢?思考这样一个问题,当有大量的订阅者时,,比方10000个,咱们一个for循环去做音讯的推送,那推送一次就会消耗很多工夫,并且不同的消费者之间也会产生延时,,所以采纳这种办法进行合成能够升高肯定的工夫。

subscribeunsubScribe

咱们先来看代码:

func (b *BrokerImpl) subscribe(topic string) (<-chan interface{}, error) { select { case <-b.exit:  return nil, errors.New("broker closed") default: } ch := make(chan interface{}, b.capacity) b.Lock() b.topics[topic] = append(b.topics[topic], ch) b.Unlock() return ch, nil}func (b *BrokerImpl) unsubscribe(topic string, sub <-chan interface{}) error { select { case <-b.exit:  return errors.New("broker closed") default: } b.RLock() subscribers, ok := b.topics[topic] b.RUnlock() if !ok {  return nil } // delete subscriber var newSubs []chan interface{} for _, subscriber := range subscribers {  if subscriber == sub {   continue  }  newSubs = append(newSubs, subscriber) } b.Lock() b.topics[topic] = newSubs b.Unlock() return nil}

这里其实就很简略了:

  • subscribe:这里的实现则是为订阅的主题创立一个channel,而后将订阅者退出到对应的topic中就能够了,并且返回一个接管channel
  • unsubScribe:这里实现的思路就是将咱们方才增加的channel删除就能够了。

close

func (b *BrokerImpl) close()  { select { case <-b.exit:  return default:  close(b.exit)  b.Lock()  b.topics = make(map[string][]chan interface{})  b.Unlock() } return}

这里就是为了敞开整个音讯队列,这句代码b.topics = make(map[string][]chan interface{})比拟重要,这里次要是为了保障下一次应用该音讯队列不发生冲突。

setConditions GetPayLoad

还差最初两个办法,一个是设置咱们的音讯队列容量,另一个是封装一个办法来获取咱们订阅的音讯:

func (b *BrokerImpl)setConditions(capacity int)  { b.capacity = capacity}func (c *Client)GetPayLoad(sub <-chan interface{})  interface{}{ for val:= range sub{  if val != nil{   return val  } } return nil}

测试

好啦,代码这么快就被写完了,接下来咱们进行测试一下吧。

单元测试

正式测试之前,咱们还是须要先进行一下单元测试,养成好的习惯,只有先自测了,能力有底气说我的代码没问题,要不间接跑程序,会呈现很多bug的。

这里咱们测试方法如下:咱们向不同的topic发送不同的信息,当订阅者收到音讯后,就行勾销订阅。

func TestClient(t *testing.T) { b := NewClient() b.SetConditions(100) var wg sync.WaitGroup for i := 0; i < 100; i++ {  topic := fmt.Sprintf("Golang梦工厂%d", i)  payload := fmt.Sprintf("asong%d", i)  ch, err := b.Subscribe(topic)  if err != nil {   t.Fatal(err)  }  wg.Add(1)  go func() {   e := b.GetPayLoad(ch)   if e != payload {    t.Fatalf("%s expected %s but get %s", topic, payload, e)   }   if err := b.Unsubscribe(topic, ch); err != nil {    t.Fatal(err)   }   wg.Done()  }()  if err := b.Publish(topic, payload); err != nil {   t.Fatal(err)  } } wg.Wait()}

测试通过,没问题,接下来咱们在写几个办法测试一下

测试

这里分为两种形式测试

测试一:应用一个定时器,向一个主题定时推送音讯.

// 一个topic 测试func OnceTopic()  { m := mq.NewClient() m.SetConditions(10) ch,err :=m.Subscribe(topic) if err != nil{  fmt.Println("subscribe failed")  return } go OncePub(m) OnceSub(ch,m) defer m.Close()}// 定时推送func OncePub(c *mq.Client)  { t := time.NewTicker(10 * time.Second) defer t.Stop() for  {  select {  case <- t.C:   err := c.Publish(topic,"asong真帅")   if err != nil{    fmt.Println("pub message failed")   }  default:  } }}// 承受订阅音讯func OnceSub(m <-chan interface{},c *mq.Client)  { for  {  val := c.GetPayLoad(m)  fmt.Printf("get message is %sn",val) }}

测试二:应用一个定时器,定时向多个主题发送音讯:

//多个topic测试func ManyTopic()  { m := mq.NewClient() defer m.Close() m.SetConditions(10) top := "" for i:=0;i<10;i++{  top = fmt.Sprintf("Golang梦工厂_%02d",i)  go Sub(m,top) } ManyPub(m)}func ManyPub(c *mq.Client)  { t := time.NewTicker(10 * time.Second) defer t.Stop() for  {  select {  case <- t.C:   for i:= 0;i<10;i++{    //多个topic 推送不同的音讯    top := fmt.Sprintf("Golang梦工厂_%02d",i)    payload := fmt.Sprintf("asong真帅_%02d",i)    err := c.Publish(top,payload)    if err != nil{     fmt.Println("pub message failed")    }   }  default:  } }}func Sub(c *mq.Client,top string)  { ch,err := c.Subscribe(top) if err != nil{  fmt.Printf("sub top:%s failedn",top) } for  {  val := c.GetPayLoad(ch)  if val != nil{   fmt.Printf("%s get message is %sn",top,val)  } }}

总结

终于帮忙姐姐解决了这个问题,姐姐开心死了,给我一顿亲,啊不对,是一顿夸,夸的人家都不好意思了。

这一篇你学会了吗?没学会不要紧,赶快去把源代码下载下来,好好通读一下,很好了解的~~~。

其实这一篇是为了接下来的kafka学习打基础的,学好了这一篇,接下来学习的kafka就会容易很多啦~~~

github地址:https://github.com/asong2020/...

如果能给一个小星星就好了~~~

结尾给大家发一个小福利吧,最近我在看[微服务架构设计模式]这一本书,讲的很好,本人也收集了一本PDF,有须要的小伙能够到自行下载。获取形式:关注公众号:[Golang梦工厂],后盾回复:[微服务],即可获取。

我翻译了一份GIN中文文档,会定期进行保护,有须要的小伙伴后盾回复[gin]即可下载。

我是asong,一名普普通通的程序猿,让我一起缓缓变强吧。我本人建了一个golang交换群,有须要的小伙伴加我vx,我拉你入群。欢送各位的关注,咱们下期见~~~

举荐往期文章:

  • 详解Context包,看这一篇就够了!!!
  • go-ElasticSearch入门看这一篇就够了(一)
  • 面试官:go中for-range应用过吗?这几个问题你能解释一下起因吗
  • 学会wire依赖注入、cron定时工作其实就这么简略!
  • 据说你还不会jwt和swagger-饭我都不吃了带着实际我的项目我就来了
  • 把握这些Go语言个性,你的程度将进步N个品位(二)
  • go实现多人聊天室,在这里你想聊什么都能够的啦!!!
  • grpc实际-学会grpc就是这么简略
  • go规范库rpc实际
  • 2020最新Gin框架中文文档 asong又捡起来了英语,用心翻译
  • 基于gin的几种热加载形式
  • boss: 这小子还不会应用validator库进行数据校验,开了~~~