仓库地址

go get -u github.com/lazychanger/go-amqp

注意事项

  • rabbitmq将连贯与管道离开,即connectionchannelconnection是实体连贯,而channel是逻辑连贯。所以当咱们多线程应用时候,应该是单connection+多channel
  • connection闪断当前,channel也会生效。所以重连当前,还须要从新建设channel
  • 所有的推送生产都是基于channel,所以channel重连当前,还须要从新开启推送生产
  • connection闪断当前。推送是有效的,这个时候再去推送,必定会报错。所以咱们重连机制,必须要保障,connection在闪断时候,也要保障推送接口失常

channel治理

思考

咱们先定义,channel到底代表什么?我认为单个channel操作单条队列的发送与生产,并且须要对主服务提供重启。既然要提供重启,那就要保障单个channel对象的生产与推送,不容许间接与channel操作,须要进行操作隔离,避免闪断与重启期间,操作有效的channel。要实现操作隔离,隔离生产比较简单,收集好相干操作的handle,重启当前,从新接入handle即可。然而隔离推送比拟艰难,因为对于用户来说,推送是实时的,而且咱们也无奈保障重连工夫是多久,所以让用户期待重连是不可行的,那么该怎么操作呢?这里我抉择的是内置一条缓存队列,用户推送数据,先进入缓存队列,再由缓存队列的消费者取出交由真正的队列,同时也要留神重连工夫过长导致内存溢出、服务端重启导致替换队列还清空。所以咱们设计的channel应该满足以下几点

  • 内置推送替换队列
  • 隔离生产
  • 提供重启机制

    • 重启channel
    • 重启生产
    • 重启推送
  • 提供优雅敞开(关机时候须要保障推送替换队列齐全推送结束)

代码案例

package go_amqpimport (    "encoding/json"    "errors"    "fmt"    "github.com/lazychanger/go-amqp/tools"    "github.com/streadway/amqp"    "log"    "sync"    "time")type Queue struct {    sync.Mutex    // 队列名    name string    // 交换机名    exchangeName string    // 队列绑定交换机路由KEY    routingKey string    // 交换机模式    exchange string    // 最大内置队列长度    maxOverstock int64    conn    *amqp.Connection    channel *amqp.Channel    // 1.为什么不采纳slice。因为golang的slice咱们不不便作为队列操作,每次入出都会引发大量的内存操作。所以采纳 map 类型    // 2.为什么采纳sync.map。因为入与出是同时执行,异步操作,如果间接采纳map类型,会导致脏读、幻读等问题,咱们须要对map类型的读写须要锁,而golang有内置残缺的读写锁map模型    // 3.采纳map类型当前,须要咱们本人保护头出尾入以及长度,所以这里有qMaxI、qMinI、ql进行数据记录    q *sync.Map    // 最大index    qMaxI int64    // 最小index    qMinI int64    // 内置队列长度    ql int64    // 是否启动开释库存    isReleaseStock bool    // 是否重启生产    isReloadConsume int    // 是否进行生产    isStopConsume bool    // 是否筹备好    already int    close bool    // 消费者    cs []consume}type MessageStatus int8const (    AlreadyStop    = 0    AlreadyReload  = 1    AlreadySucceed = 2    MessageStatusSucceed MessageStatus = 1    MessageStatusError   MessageStatus = 2    MessageStatusRequeue MessageStatus = 3)// 提供重启服务func (q *Queue) Reload(conn *amqp.Connection) error {    // 先敞开状态    q.conn = conn    // 标识重启中    q.already = AlreadyReload    return q.init()}// 初始化操作func (q *Queue) init() error {    var (        ch  *amqp.Channel        err error    )    log.Println("[amqp] queue init start")    q.Lock()    ch, err = q.conn.Channel()    if err != nil {        return err    }    // 创立交换机    if err = ch.ExchangeDeclare(q.exchangeName, q.exchange, false, false, false, false, nil); err != nil {        return errors.New(fmt.Sprintf("[amqp] exchange declare failed, err: %s", err))    }    // 创立队列    if _, err = ch.QueueDeclare(q.name, false, false, false, false, nil); err != nil {        return errors.New(fmt.Sprintf("[amqp] queue declare failed, err: %s", err))    }    // 交换机绑定队列    if err = ch.QueueBind(q.name, q.routingKey, q.exchangeName, false, nil); err != nil {        return errors.New(fmt.Sprintf("[amqp] queue bind failed, err: %s", err))    }    // 管道替换    q.channel = ch    // 告知曾经筹备结束    if q.already == AlreadyReload {        q.already = AlreadySucceed        // 从新触发生产        q.reloadConsume()        if !q.isReleaseStock && q.ql > 0 {            log.Println("init release stock")            // 从新触发            go q.releaseStock()        }    }    q.Unlock()    log.Println("[amqp] queue init end")    return nil}// 对推送简略封装一下,使json对象推送更加简便func (q *Queue) PublishJson(v interface{}) error {    body, err := json.Marshal(v)    if err != nil {        return err    }    return q.Publish(body)}// 原始推送,要求[]bytefunc (q *Queue) Publish(data []byte) error {    // 查看库存,避免溢出    if q.maxOverstock > 0 && q.ql > q.maxOverstock {        return publishOverstock    }    if q.close {        return errors.New("service closing")    }    // 启动锁,避免开释库存时候,独特操作导致脏写    q.Lock()    // 避免并发map创立    if q.q == nil {        q.q = &sync.Map{}    }    // 减少最大下标    q.qMaxI++    // 减少最大长度    q.ql++    // 存储值    q.q.Store(q.qMaxI, data)    q.Unlock()    // 查看开释库存是否启动,未启动并且channel曾经筹备结束,就启动库存开释    if !q.isReleaseStock && q.already == AlreadySucceed {        go q.releaseStock()    }    log.Printf("published,now %d", q.ql)    return nil}// 内置队列生产,库存开释func (q *Queue) releaseStock() {    // 判断是否反复启动    q.Lock()    if q.isReleaseStock {        q.Unlock()        log.Println("[amqp] release stock already run")        return    }    // 标记服务曾经启动    q.isReleaseStock = true    q.Unlock()    log.Println("[amqp] release stock")    for {        // 如果库存为空或者channel还未筹备好就敞开循环        if q.ql == 0 || q.already != AlreadySucceed {            break        }        // 先将以后长度取出,避免循环时候批改,变成脏读        l := q.ql        // 理论启动以后轮次库存开释        for i := int64(0); i < l; i++ {            if q.already != AlreadySucceed {                break            }            // 对库存            q.Lock()            log.Printf("internal queues length: %d", q.ql)            // 对库存最小下标进行+1            q.qMinI++            // 缩小库存最大数            q.ql--            // 锁期间,顶部索引不变            min := q.qMinI            q.Unlock()            // 读取内容            body, has := q.q.Load(min)            // 预防脏读            if has && body != nil {                // 推送                _ = q.publish(body.([]byte))            } else {                log.Println("[amqp] data error")            }            // 开释map空间            q.q.Delete(min)        }        // 本轮库存开释曾经完结,提早执行3秒后执行下一轮        ticker := time.NewTicker(time.Second * 3)        select {        case <-ticker.C:            ticker.Stop()        }    }    // 标记敞开    q.isReleaseStock = false}// 理论channel向队列发送数据func (q *Queue) publish(data []byte) error {    return q.channel.Publish(q.exchangeName, q.routingKey, false, false, amqp.Publishing{        Body: data,    })}// 增加生产内容,先存储,期待服务启动当前触发func (q *Queue) Consume(name string, consumeFunc ConsumeFunc, repeat int) error {    q.cs = append(q.cs, consume{        repeat:      tools.IF(repeat <= 0, 1, repeat).(int),        consumeFunc: consumeFunc,        name:        name,    })    // 尝试启动生产    q.reloadConsume()    return nil}// 暂停生产func (q *Queue) StopConsume() {    q.isStopConsume = true}// 启动生产func (q *Queue) StartConsume() {    q.isStopConsume = false    q.reloadConsume()}// 理论触发生产func (q *Queue) reloadConsume() {    // 如果未启动,间接返回    if q.already != AlreadySucceed || q.isStopConsume {        return    }    // 推送重启生产    q.isReloadConsume++    // 记录以后生产重启值    reloadConsume := q.isReloadConsume    // 记录以后channel重启值    for i, c := range q.cs {        // 并发生产        for l := 0; l < c.repeat; l++ {            name := fmt.Sprintf("%s_%d-%d", c.name, i, l)            msgs, err := q.channel.Consume(q.name, name, false, false, false, false, nil)            if err != nil {                log.Fatalf("[AMQP] customer register err;name: %s, %s", name, err)            } else {                go func(c ConsumeFunc, consumeName string, reloadConsume int) {                    for msg := range msgs {                        switch c(msg.Body, consumeName) {                        case MessageStatusSucceed:                        case MessageStatusError:                            _ = msg.Ack(true)                            break                        case MessageStatusRequeue:                            _ = msg.Reject(true)                            break                        }                        // 如果channel重启或者生产重启,都完结以后生产,避免溢出,或者正在敞开                        if q.already != AlreadySucceed || q.isReloadConsume != reloadConsume || q.close || q.isStopConsume {                            break                        }                    }                }(c.consumeFunc, name, reloadConsume)            }        }    }}// 优雅重启func (q *Queue) Close() error {    // 先标记敞开    q.close = true    retry := 0    for {        if q.ql > 0 {            if q.already == AlreadySucceed {                if q.isReleaseStock == false {                    q.releaseStock()                }            } else {                retry++                // 如果channel没有筹备好,内置队列也没有开释完,则重试三次,三次还没有解决好,就放弃重试                if retry > 3 {                    break                }            }        } else {            break        }        ticker := time.NewTicker(time.Second / 2)        select {        case <-ticker.C:            ticker.Stop()        }    }    return q.channel.Close()}type consume struct {    name        string    consumeFunc ConsumeFunc    repeat      int}type ConsumeFunc func(data []byte, name string) MessageStatus

channel治理

思考

下面介绍了单个channel外部状态保护,那么当初就要开始对这些channel进行治理。治理内容如下:

  • channel注册
  • 断线重启
  • 优雅敞开

代码

package go_amqpimport (    "github.com/streadway/amqp"    "log"    "sync"    "time")type Connection struct {    // 配置    config *config    // amqp连贯    conn *amqp.Connection    // 连贯状态    isConnected bool    // 关机提醒    done chan bool    // amqp闪断告诉    notifyClose chan *amqp.Error    // 多队列(channel),此处认为一个channel治理一条队列    qs []*Queue}type config struct {    // 连贯AMQP DSN构建驱动    driver Driver    // 最大音讯重发次数    maxSendRetries int    // 最大重连次数    maxReconnects int    // 重连延迟时间    reconnectDelay time.Duration    // 最大发送积压数    maxOverstock int64}func New(opts ...Option) (*Connection, error) {    // 建设默认连贯对象    conn := &Connection{        // 生成默认配置        config: &config{            reconnectDelay: time.Second,            maxReconnects:  0,            maxSendRetries: 3,        },    }    for _, opt := range opts {        // 配置写入        opt(conn.config)    }    // 根底必须配置查看    if conn.config.driver == nil {        return nil, missingConnectionDsnDriver    }    // 断线重连    go conn.handleReconnect()    return conn, nil}func (c *Connection) handleReconnect() {    tryConnects := 0    for {        c.isConnected = false        if err := c.connect(); err != nil {            if c.config.maxReconnects > 0 && tryConnects > c.config.maxReconnects {                log.Fatalf("[AMQP] Reconnection times exceeded!(%d)", tryConnects)                return            }            tryConnects += 1            log.Printf("Failed to connect. %s Retrying...(%d)", err, tryConnects)            time.Sleep(c.config.reconnectDelay)        } else {            // clear try connect            tryConnects = 0        }        // 期待下一步信号告诉        select {        case <-c.done:            return        case <-c.notifyClose:        }    }}//func (c *Connection) connect() error {    url := c.config.driver.Url()    log.Printf("[amqp] connected. %s", url)    conn, err := amqp.Dial(url)    if err != nil {        return err    }    c.conn = conn    c.notifyClose = make(chan *amqp.Error)    c.conn.NotifyClose(c.notifyClose)    c.isConnected = true    // channel重连    c.queueReconnect()    return nil}// channel重连// 当连贯闪断当前,须要从新建设新的连贯,所以,所有的channel也须要进行新的连贯func (c *Connection) queueReconnect() {    for _, q := range c.qs {        if err := q.Reload(c.conn); err != nil {            log.Println(err)        }    }}// 优雅敞开,// 敞开channel// 敞开闪断重连机制// 敞开connectionfunc (c *Connection) Close() error {    if c.isConnected {        return alreadyClosed    }    var wg sync.WaitGroup    // 批量敞开    for i := 0; i < len(c.qs); i++ {        wg.Add(1)        go func(idx int, group *sync.WaitGroup) {            _ = c.qs[idx].Close()            group.Done()        }(i, &wg)    }    wg.Wait()    _ = c.conn.Close()    // 敞开    close(c.done)    return nil}// 生成单Channel治理,让一个channel治理一条队列的发送与生产func (c *Connection) Queue(name, exchange, routingKey string) (*Queue, error) {    q := &Queue{        name:         name,        exchange:     amqp.ExchangeDirect,        exchangeName: exchange,        routingKey:   routingKey,        maxOverstock: c.config.maxOverstock,    }    if c.isConnected == true {        if err := q.Reload(c.conn); err != nil {            return nil, err        }    }    c.qs = append(c.qs, q)    return q, nil}

刚开始写一些技术分享的,很多中央或者构造可能写的比拟毛糙,欢送各位执教、交换。如果有不理解或者新的想法,也能够评论留言沟通。thanks!