仓库地址
go get -u github.com/lazychanger/go-amqp
注意事项
- rabbitmq将连贯与管道离开,即
connection
与channel
。connection
是实体连贯,而channel
是逻辑连贯。所以当咱们多线程应用时候,应该是单connection
+多channel
connection
闪断当前,channel
也会生效。所以重连当前,还须要从新建设channel
- 所有的
推送
,生产
都是基于channel
,所以channel
重连当前,还须要从新开启推送
、生产
- 在
connection
闪断当前。推送
是有效的,这个时候再去推送,必定会报错。所以咱们重连机制,必须要保障,connection
在闪断时候,也要保障推送
接口失常
单channel
治理
思考
咱们先定义,channel
到底代表什么?我认为单个channel
操作单条队列的发送与生产,并且须要对主服务提供重启。既然要提供重启,那就要保障单个channel
对象的生产与推送,不容许间接与channel
操作,须要进行操作隔离,避免闪断与重启期间,操作有效的channel
。要实现操作隔离,隔离生产
比较简单,收集好相干操作的handle
,重启当前,从新接入handle
即可。然而隔离推送
比拟艰难,因为对于用户来说,推送
是实时的,而且咱们也无奈保障重连工夫是多久,所以让用户期待重连是不可行的,那么该怎么操作呢?这里我抉择的是内置一条缓存队列,用户推送
数据,先进入缓存队列,再由缓存队列的消费者取出交由真正的队列,同时也要留神重连工夫过长导致内存溢出、服务端重启导致替换队列还清空。所以咱们设计的channel
应该满足以下几点
- 内置推送替换队列
- 隔离生产
提供重启机制
- 重启
channel
- 重启
生产
- 重启
推送
- 重启
- 提供优雅敞开(关机时候须要保障推送替换队列齐全推送结束)
代码案例
package go_amqpimport ( "encoding/json" "errors" "fmt" "github.com/lazychanger/go-amqp/tools" "github.com/streadway/amqp" "log" "sync" "time")type Queue struct { sync.Mutex // 队列名 name string // 交换机名 exchangeName string // 队列绑定交换机路由KEY routingKey string // 交换机模式 exchange string // 最大内置队列长度 maxOverstock int64 conn *amqp.Connection channel *amqp.Channel // 1.为什么不采纳slice。因为golang的slice咱们不不便作为队列操作,每次入出都会引发大量的内存操作。所以采纳 map 类型 // 2.为什么采纳sync.map。因为入与出是同时执行,异步操作,如果间接采纳map类型,会导致脏读、幻读等问题,咱们须要对map类型的读写须要锁,而golang有内置残缺的读写锁map模型 // 3.采纳map类型当前,须要咱们本人保护头出尾入以及长度,所以这里有qMaxI、qMinI、ql进行数据记录 q *sync.Map // 最大index qMaxI int64 // 最小index qMinI int64 // 内置队列长度 ql int64 // 是否启动开释库存 isReleaseStock bool // 是否重启生产 isReloadConsume int // 是否进行生产 isStopConsume bool // 是否筹备好 already int close bool // 消费者 cs []consume}type MessageStatus int8const ( AlreadyStop = 0 AlreadyReload = 1 AlreadySucceed = 2 MessageStatusSucceed MessageStatus = 1 MessageStatusError MessageStatus = 2 MessageStatusRequeue MessageStatus = 3)// 提供重启服务func (q *Queue) Reload(conn *amqp.Connection) error { // 先敞开状态 q.conn = conn // 标识重启中 q.already = AlreadyReload return q.init()}// 初始化操作func (q *Queue) init() error { var ( ch *amqp.Channel err error ) log.Println("[amqp] queue init start") q.Lock() ch, err = q.conn.Channel() if err != nil { return err } // 创立交换机 if err = ch.ExchangeDeclare(q.exchangeName, q.exchange, false, false, false, false, nil); err != nil { return errors.New(fmt.Sprintf("[amqp] exchange declare failed, err: %s", err)) } // 创立队列 if _, err = ch.QueueDeclare(q.name, false, false, false, false, nil); err != nil { return errors.New(fmt.Sprintf("[amqp] queue declare failed, err: %s", err)) } // 交换机绑定队列 if err = ch.QueueBind(q.name, q.routingKey, q.exchangeName, false, nil); err != nil { return errors.New(fmt.Sprintf("[amqp] queue bind failed, err: %s", err)) } // 管道替换 q.channel = ch // 告知曾经筹备结束 if q.already == AlreadyReload { q.already = AlreadySucceed // 从新触发生产 q.reloadConsume() if !q.isReleaseStock && q.ql > 0 { log.Println("init release stock") // 从新触发 go q.releaseStock() } } q.Unlock() log.Println("[amqp] queue init end") return nil}// 对推送简略封装一下,使json对象推送更加简便func (q *Queue) PublishJson(v interface{}) error { body, err := json.Marshal(v) if err != nil { return err } return q.Publish(body)}// 原始推送,要求[]bytefunc (q *Queue) Publish(data []byte) error { // 查看库存,避免溢出 if q.maxOverstock > 0 && q.ql > q.maxOverstock { return publishOverstock } if q.close { return errors.New("service closing") } // 启动锁,避免开释库存时候,独特操作导致脏写 q.Lock() // 避免并发map创立 if q.q == nil { q.q = &sync.Map{} } // 减少最大下标 q.qMaxI++ // 减少最大长度 q.ql++ // 存储值 q.q.Store(q.qMaxI, data) q.Unlock() // 查看开释库存是否启动,未启动并且channel曾经筹备结束,就启动库存开释 if !q.isReleaseStock && q.already == AlreadySucceed { go q.releaseStock() } log.Printf("published,now %d", q.ql) return nil}// 内置队列生产,库存开释func (q *Queue) releaseStock() { // 判断是否反复启动 q.Lock() if q.isReleaseStock { q.Unlock() log.Println("[amqp] release stock already run") return } // 标记服务曾经启动 q.isReleaseStock = true q.Unlock() log.Println("[amqp] release stock") for { // 如果库存为空或者channel还未筹备好就敞开循环 if q.ql == 0 || q.already != AlreadySucceed { break } // 先将以后长度取出,避免循环时候批改,变成脏读 l := q.ql // 理论启动以后轮次库存开释 for i := int64(0); i < l; i++ { if q.already != AlreadySucceed { break } // 对库存 q.Lock() log.Printf("internal queues length: %d", q.ql) // 对库存最小下标进行+1 q.qMinI++ // 缩小库存最大数 q.ql-- // 锁期间,顶部索引不变 min := q.qMinI q.Unlock() // 读取内容 body, has := q.q.Load(min) // 预防脏读 if has && body != nil { // 推送 _ = q.publish(body.([]byte)) } else { log.Println("[amqp] data error") } // 开释map空间 q.q.Delete(min) } // 本轮库存开释曾经完结,提早执行3秒后执行下一轮 ticker := time.NewTicker(time.Second * 3) select { case <-ticker.C: ticker.Stop() } } // 标记敞开 q.isReleaseStock = false}// 理论channel向队列发送数据func (q *Queue) publish(data []byte) error { return q.channel.Publish(q.exchangeName, q.routingKey, false, false, amqp.Publishing{ Body: data, })}// 增加生产内容,先存储,期待服务启动当前触发func (q *Queue) Consume(name string, consumeFunc ConsumeFunc, repeat int) error { q.cs = append(q.cs, consume{ repeat: tools.IF(repeat <= 0, 1, repeat).(int), consumeFunc: consumeFunc, name: name, }) // 尝试启动生产 q.reloadConsume() return nil}// 暂停生产func (q *Queue) StopConsume() { q.isStopConsume = true}// 启动生产func (q *Queue) StartConsume() { q.isStopConsume = false q.reloadConsume()}// 理论触发生产func (q *Queue) reloadConsume() { // 如果未启动,间接返回 if q.already != AlreadySucceed || q.isStopConsume { return } // 推送重启生产 q.isReloadConsume++ // 记录以后生产重启值 reloadConsume := q.isReloadConsume // 记录以后channel重启值 for i, c := range q.cs { // 并发生产 for l := 0; l < c.repeat; l++ { name := fmt.Sprintf("%s_%d-%d", c.name, i, l) msgs, err := q.channel.Consume(q.name, name, false, false, false, false, nil) if err != nil { log.Fatalf("[AMQP] customer register err;name: %s, %s", name, err) } else { go func(c ConsumeFunc, consumeName string, reloadConsume int) { for msg := range msgs { switch c(msg.Body, consumeName) { case MessageStatusSucceed: case MessageStatusError: _ = msg.Ack(true) break case MessageStatusRequeue: _ = msg.Reject(true) break } // 如果channel重启或者生产重启,都完结以后生产,避免溢出,或者正在敞开 if q.already != AlreadySucceed || q.isReloadConsume != reloadConsume || q.close || q.isStopConsume { break } } }(c.consumeFunc, name, reloadConsume) } } }}// 优雅重启func (q *Queue) Close() error { // 先标记敞开 q.close = true retry := 0 for { if q.ql > 0 { if q.already == AlreadySucceed { if q.isReleaseStock == false { q.releaseStock() } } else { retry++ // 如果channel没有筹备好,内置队列也没有开释完,则重试三次,三次还没有解决好,就放弃重试 if retry > 3 { break } } } else { break } ticker := time.NewTicker(time.Second / 2) select { case <-ticker.C: ticker.Stop() } } return q.channel.Close()}type consume struct { name string consumeFunc ConsumeFunc repeat int}type ConsumeFunc func(data []byte, name string) MessageStatus
多channel
治理
思考
下面介绍了单个channel
外部状态保护,那么当初就要开始对这些channel
进行治理。治理内容如下:
channel
注册- 断线重启
- 优雅敞开
代码
package go_amqpimport ( "github.com/streadway/amqp" "log" "sync" "time")type Connection struct { // 配置 config *config // amqp连贯 conn *amqp.Connection // 连贯状态 isConnected bool // 关机提醒 done chan bool // amqp闪断告诉 notifyClose chan *amqp.Error // 多队列(channel),此处认为一个channel治理一条队列 qs []*Queue}type config struct { // 连贯AMQP DSN构建驱动 driver Driver // 最大音讯重发次数 maxSendRetries int // 最大重连次数 maxReconnects int // 重连延迟时间 reconnectDelay time.Duration // 最大发送积压数 maxOverstock int64}func New(opts ...Option) (*Connection, error) { // 建设默认连贯对象 conn := &Connection{ // 生成默认配置 config: &config{ reconnectDelay: time.Second, maxReconnects: 0, maxSendRetries: 3, }, } for _, opt := range opts { // 配置写入 opt(conn.config) } // 根底必须配置查看 if conn.config.driver == nil { return nil, missingConnectionDsnDriver } // 断线重连 go conn.handleReconnect() return conn, nil}func (c *Connection) handleReconnect() { tryConnects := 0 for { c.isConnected = false if err := c.connect(); err != nil { if c.config.maxReconnects > 0 && tryConnects > c.config.maxReconnects { log.Fatalf("[AMQP] Reconnection times exceeded!(%d)", tryConnects) return } tryConnects += 1 log.Printf("Failed to connect. %s Retrying...(%d)", err, tryConnects) time.Sleep(c.config.reconnectDelay) } else { // clear try connect tryConnects = 0 } // 期待下一步信号告诉 select { case <-c.done: return case <-c.notifyClose: } }}//func (c *Connection) connect() error { url := c.config.driver.Url() log.Printf("[amqp] connected. %s", url) conn, err := amqp.Dial(url) if err != nil { return err } c.conn = conn c.notifyClose = make(chan *amqp.Error) c.conn.NotifyClose(c.notifyClose) c.isConnected = true // channel重连 c.queueReconnect() return nil}// channel重连// 当连贯闪断当前,须要从新建设新的连贯,所以,所有的channel也须要进行新的连贯func (c *Connection) queueReconnect() { for _, q := range c.qs { if err := q.Reload(c.conn); err != nil { log.Println(err) } }}// 优雅敞开,// 敞开channel// 敞开闪断重连机制// 敞开connectionfunc (c *Connection) Close() error { if c.isConnected { return alreadyClosed } var wg sync.WaitGroup // 批量敞开 for i := 0; i < len(c.qs); i++ { wg.Add(1) go func(idx int, group *sync.WaitGroup) { _ = c.qs[idx].Close() group.Done() }(i, &wg) } wg.Wait() _ = c.conn.Close() // 敞开 close(c.done) return nil}// 生成单Channel治理,让一个channel治理一条队列的发送与生产func (c *Connection) Queue(name, exchange, routingKey string) (*Queue, error) { q := &Queue{ name: name, exchange: amqp.ExchangeDirect, exchangeName: exchange, routingKey: routingKey, maxOverstock: c.config.maxOverstock, } if c.isConnected == true { if err := q.Reload(c.conn); err != nil { return nil, err } } c.qs = append(c.qs, q) return q, nil}
注
刚开始写一些技术分享的,很多中央或者构造可能写的比拟毛糙,欢送各位执教、交换。如果有不理解或者新的想法,也能够评论留言沟通。thanks!