共计 8161 个字符,预计需要花费 21 分钟才能阅读完成。
仓库地址
go get -u github.com/lazychanger/go-amqp
注意事项
- rabbitmq 将连贯与管道离开,即
connection
与channel
。connection
是实体连贯,而channel
是逻辑连贯。所以当咱们多线程应用时候,应该是单connection
+ 多channel
connection
闪断当前,channel
也会生效。所以重连当前,还须要从新建设channel
- 所有的
推送
,生产
都是基于channel
,所以channel
重连当前,还须要从新开启推送
、生产
- 在
connection
闪断当前。推送
是有效的,这个时候再去推送,必定会报错。所以咱们重连机制,必须要保障,connection
在闪断时候,也要保障推送
接口失常
单 channel
治理
思考
咱们先定义,channel
到底代表什么?我认为单个 channel
操作单条队列的发送与生产,并且须要对主服务提供重启。既然要提供重启,那就要保障单个 channel
对象的生产与推送,不容许间接与 channel
操作,须要进行操作隔离,避免闪断与重启期间,操作有效的 channel
。要实现操作隔离,隔离 生产
比较简单,收集好相干操作的 handle
,重启当前,从新接入handle
即可。然而隔离 推送
比拟艰难,因为对于用户来说,推送
是实时的,而且咱们也无奈保障重连工夫是多久,所以让用户期待重连是不可行的,那么该怎么操作呢?这里我抉择的是内置一条缓存队列,用户 推送
数据,先进入缓存队列,再由缓存队列的消费者取出交由真正的队列,同时也要留神重连工夫过长导致内存溢出、服务端重启导致替换队列还清空。所以咱们设计的 channel
应该满足以下几点
- 内置推送替换队列
- 隔离生产
-
提供重启机制
- 重启
channel
- 重启
生产
- 重启
推送
- 重启
- 提供优雅敞开(关机时候须要保障推送替换队列齐全推送结束)
代码案例
package go_amqp
import (
"encoding/json"
"errors"
"fmt"
"github.com/lazychanger/go-amqp/tools"
"github.com/streadway/amqp"
"log"
"sync"
"time"
)
type Queue struct {
sync.Mutex
// 队列名
name string
// 交换机名
exchangeName string
// 队列绑定交换机路由 KEY
routingKey string
// 交换机模式
exchange string
// 最大内置队列长度
maxOverstock int64
conn *amqp.Connection
channel *amqp.Channel
// 1. 为什么不采纳 slice。因为 golang 的 slice 咱们不不便作为队列操作,每次入出都会引发大量的内存操作。所以采纳 map 类型
// 2. 为什么采纳 sync.map。因为入与出是同时执行,异步操作,如果间接采纳 map 类型,会导致脏读、幻读等问题,咱们须要对 map 类型的读写须要锁,而 golang 有内置残缺的读写锁 map 模型
// 3. 采纳 map 类型当前,须要咱们本人保护头出尾入以及长度,所以这里有 qMaxI、qMinI、ql 进行数据记录
q *sync.Map
// 最大 index
qMaxI int64
// 最小 index
qMinI int64
// 内置队列长度
ql int64
// 是否启动开释库存
isReleaseStock bool
// 是否重启生产
isReloadConsume int
// 是否进行生产
isStopConsume bool
// 是否筹备好
already int
close bool
// 消费者
cs []consume}
type MessageStatus int8
const (
AlreadyStop = 0
AlreadyReload = 1
AlreadySucceed = 2
MessageStatusSucceed MessageStatus = 1
MessageStatusError MessageStatus = 2
MessageStatusRequeue MessageStatus = 3
)
// 提供重启服务
func (q *Queue) Reload(conn *amqp.Connection) error {
// 先敞开状态
q.conn = conn
// 标识重启中
q.already = AlreadyReload
return q.init()}
// 初始化操作
func (q *Queue) init() error {
var (
ch *amqp.Channel
err error
)
log.Println("[amqp] queue init start")
q.Lock()
ch, err = q.conn.Channel()
if err != nil {return err}
// 创立交换机
if err = ch.ExchangeDeclare(q.exchangeName, q.exchange, false, false, false, false, nil); err != nil {return errors.New(fmt.Sprintf("[amqp] exchange declare failed, err: %s", err))
}
// 创立队列
if _, err = ch.QueueDeclare(q.name, false, false, false, false, nil); err != nil {return errors.New(fmt.Sprintf("[amqp] queue declare failed, err: %s", err))
}
// 交换机绑定队列
if err = ch.QueueBind(q.name, q.routingKey, q.exchangeName, false, nil); err != nil {return errors.New(fmt.Sprintf("[amqp] queue bind failed, err: %s", err))
}
// 管道替换
q.channel = ch
// 告知曾经筹备结束
if q.already == AlreadyReload {
q.already = AlreadySucceed
// 从新触发生产
q.reloadConsume()
if !q.isReleaseStock && q.ql > 0 {log.Println("init release stock")
// 从新触发
go q.releaseStock()}
}
q.Unlock()
log.Println("[amqp] queue init end")
return nil
}
// 对推送简略封装一下,使 json 对象推送更加简便
func (q *Queue) PublishJson(v interface{}) error {body, err := json.Marshal(v)
if err != nil {return err}
return q.Publish(body)
}
// 原始推送,要求[]byte
func (q *Queue) Publish(data []byte) error {
// 查看库存,避免溢出
if q.maxOverstock > 0 && q.ql > q.maxOverstock {return publishOverstock}
if q.close {return errors.New("service closing")
}
// 启动锁,避免开释库存时候,独特操作导致脏写
q.Lock()
// 避免并发 map 创立
if q.q == nil {q.q = &sync.Map{}
}
// 减少最大下标
q.qMaxI++
// 减少最大长度
q.ql++
// 存储值
q.q.Store(q.qMaxI, data)
q.Unlock()
// 查看开释库存是否启动,未启动并且 channel 曾经筹备结束,就启动库存开释
if !q.isReleaseStock && q.already == AlreadySucceed {go q.releaseStock()
}
log.Printf("published,now %d", q.ql)
return nil
}
// 内置队列生产,库存开释
func (q *Queue) releaseStock() {
// 判断是否反复启动
q.Lock()
if q.isReleaseStock {q.Unlock()
log.Println("[amqp] release stock already run")
return
}
// 标记服务曾经启动
q.isReleaseStock = true
q.Unlock()
log.Println("[amqp] release stock")
for {
// 如果库存为空或者 channel 还未筹备好就敞开循环
if q.ql == 0 || q.already != AlreadySucceed {break}
// 先将以后长度取出,避免循环时候批改,变成脏读
l := q.ql
// 理论启动以后轮次库存开释
for i := int64(0); i < l; i++ {
if q.already != AlreadySucceed {break}
// 对库存
q.Lock()
log.Printf("internal queues length: %d", q.ql)
// 对库存最小下标进行 +1
q.qMinI++
// 缩小库存最大数
q.ql--
// 锁期间,顶部索引不变
min := q.qMinI
q.Unlock()
// 读取内容
body, has := q.q.Load(min)
// 预防脏读
if has && body != nil {
// 推送
_ = q.publish(body.([]byte))
} else {log.Println("[amqp] data error")
}
// 开释 map 空间
q.q.Delete(min)
}
// 本轮库存开释曾经完结,提早执行 3 秒后执行下一轮
ticker := time.NewTicker(time.Second * 3)
select {
case <-ticker.C:
ticker.Stop()}
}
// 标记敞开
q.isReleaseStock = false
}
// 理论 channel 向队列发送数据
func (q *Queue) publish(data []byte) error {
return q.channel.Publish(q.exchangeName, q.routingKey, false, false, amqp.Publishing{Body: data,})
}
// 增加生产内容,先存储,期待服务启动当前触发
func (q *Queue) Consume(name string, consumeFunc ConsumeFunc, repeat int) error {
q.cs = append(q.cs, consume{repeat: tools.IF(repeat <= 0, 1, repeat).(int),
consumeFunc: consumeFunc,
name: name,
})
// 尝试启动生产
q.reloadConsume()
return nil
}
// 暂停生产
func (q *Queue) StopConsume() {q.isStopConsume = true}
// 启动生产
func (q *Queue) StartConsume() {
q.isStopConsume = false
q.reloadConsume()}
// 理论触发生产
func (q *Queue) reloadConsume() {
// 如果未启动,间接返回
if q.already != AlreadySucceed || q.isStopConsume {return}
// 推送重启生产
q.isReloadConsume++
// 记录以后生产重启值
reloadConsume := q.isReloadConsume
// 记录以后 channel 重启值
for i, c := range q.cs {
// 并发生产
for l := 0; l < c.repeat; l++ {name := fmt.Sprintf("%s_%d-%d", c.name, i, l)
msgs, err := q.channel.Consume(q.name, name, false, false, false, false, nil)
if err != nil {log.Fatalf("[AMQP] customer register err;name: %s, %s", name, err)
} else {go func(c ConsumeFunc, consumeName string, reloadConsume int) {
for msg := range msgs {switch c(msg.Body, consumeName) {
case MessageStatusSucceed:
case MessageStatusError:
_ = msg.Ack(true)
break
case MessageStatusRequeue:
_ = msg.Reject(true)
break
}
// 如果 channel 重启或者生产重启,都完结以后生产,避免溢出,或者正在敞开
if q.already != AlreadySucceed || q.isReloadConsume != reloadConsume || q.close || q.isStopConsume {break}
}
}(c.consumeFunc, name, reloadConsume)
}
}
}
}
// 优雅重启
func (q *Queue) Close() error {
// 先标记敞开
q.close = true
retry := 0
for {
if q.ql > 0 {
if q.already == AlreadySucceed {
if q.isReleaseStock == false {q.releaseStock()
}
} else {
retry++
// 如果 channel 没有筹备好,内置队列也没有开释完,则重试三次,三次还没有解决好,就放弃重试
if retry > 3 {break}
}
} else {break}
ticker := time.NewTicker(time.Second / 2)
select {
case <-ticker.C:
ticker.Stop()}
}
return q.channel.Close()}
type consume struct {
name string
consumeFunc ConsumeFunc
repeat int
}
type ConsumeFunc func(data []byte, name string) MessageStatus
多 channel
治理
思考
下面介绍了单个 channel
外部状态保护,那么当初就要开始对这些 channel
进行治理。治理内容如下:
channel
注册- 断线重启
- 优雅敞开
代码
package go_amqp
import (
"github.com/streadway/amqp"
"log"
"sync"
"time"
)
type Connection struct {
// 配置
config *config
// amqp 连贯
conn *amqp.Connection
// 连贯状态
isConnected bool
// 关机提醒
done chan bool
// amqp 闪断告诉
notifyClose chan *amqp.Error
// 多队列(channel),此处认为一个 channel 治理一条队列
qs []*Queue}
type config struct {
// 连贯 AMQP DSN 构建驱动
driver Driver
// 最大音讯重发次数
maxSendRetries int
// 最大重连次数
maxReconnects int
// 重连延迟时间
reconnectDelay time.Duration
// 最大发送积压数
maxOverstock int64
}
func New(opts ...Option) (*Connection, error) {
// 建设默认连贯对象
conn := &Connection{
// 生成默认配置
config: &config{
reconnectDelay: time.Second,
maxReconnects: 0,
maxSendRetries: 3,
},
}
for _, opt := range opts {
// 配置写入
opt(conn.config)
}
// 根底必须配置查看
if conn.config.driver == nil {return nil, missingConnectionDsnDriver}
// 断线重连
go conn.handleReconnect()
return conn, nil
}
func (c *Connection) handleReconnect() {
tryConnects := 0
for {
c.isConnected = false
if err := c.connect(); err != nil {
if c.config.maxReconnects > 0 && tryConnects > c.config.maxReconnects {log.Fatalf("[AMQP] Reconnection times exceeded!(%d)", tryConnects)
return
}
tryConnects += 1
log.Printf("Failed to connect. %s Retrying...(%d)", err, tryConnects)
time.Sleep(c.config.reconnectDelay)
} else {
// clear try connect
tryConnects = 0
}
// 期待下一步信号告诉
select {
case <-c.done:
return
case <-c.notifyClose:
}
}
}
//
func (c *Connection) connect() error {url := c.config.driver.Url()
log.Printf("[amqp] connected. %s", url)
conn, err := amqp.Dial(url)
if err != nil {return err}
c.conn = conn
c.notifyClose = make(chan *amqp.Error)
c.conn.NotifyClose(c.notifyClose)
c.isConnected = true
// channel 重连
c.queueReconnect()
return nil
}
// channel 重连
// 当连贯闪断当前,须要从新建设新的连贯,所以,所有的 channel 也须要进行新的连贯
func (c *Connection) queueReconnect() {
for _, q := range c.qs {if err := q.Reload(c.conn); err != nil {log.Println(err)
}
}
}
// 优雅敞开,// 敞开 channel
// 敞开闪断重连机制
// 敞开 connection
func (c *Connection) Close() error {
if c.isConnected {return alreadyClosed}
var wg sync.WaitGroup
// 批量敞开
for i := 0; i < len(c.qs); i++ {wg.Add(1)
go func(idx int, group *sync.WaitGroup) {_ = c.qs[idx].Close()
group.Done()}(i, &wg)
}
wg.Wait()
_ = c.conn.Close()
// 敞开
close(c.done)
return nil
}
// 生成单 Channel 治理,让一个 channel 治理一条队列的发送与生产
func (c *Connection) Queue(name, exchange, routingKey string) (*Queue, error) {
q := &Queue{
name: name,
exchange: amqp.ExchangeDirect,
exchangeName: exchange,
routingKey: routingKey,
maxOverstock: c.config.maxOverstock,
}
if c.isConnected == true {if err := q.Reload(c.conn); err != nil {return nil, err}
}
c.qs = append(c.qs, q)
return q, nil
}
注
刚开始写一些技术分享的,很多中央或者构造可能写的比拟毛糙,欢送各位执教、交换。如果有不理解或者新的想法,也能够评论留言沟通。thanks!
正文完