关于golang:简单设计goamqp

2次阅读

共计 8161 个字符,预计需要花费 21 分钟才能阅读完成。

仓库地址

go get -u github.com/lazychanger/go-amqp

注意事项

  • rabbitmq 将连贯与管道离开,即 connectionchannelconnection是实体连贯,而 channel 是逻辑连贯。所以当咱们多线程应用时候,应该是单connection+ 多channel
  • connection闪断当前,channel也会生效。所以重连当前,还须要从新建设channel
  • 所有的 推送 生产 都是基于 channel,所以channel 重连当前,还须要从新开启 推送 生产
  • connection 闪断当前。推送 是有效的,这个时候再去推送,必定会报错。所以咱们重连机制,必须要保障,connection在闪断时候,也要保障 推送 接口失常

channel 治理

思考

咱们先定义,channel到底代表什么?我认为单个 channel 操作单条队列的发送与生产,并且须要对主服务提供重启。既然要提供重启,那就要保障单个 channel 对象的生产与推送,不容许间接与 channel 操作,须要进行操作隔离,避免闪断与重启期间,操作有效的 channel。要实现操作隔离,隔离 生产 比较简单,收集好相干操作的 handle,重启当前,从新接入handle 即可。然而隔离 推送 比拟艰难,因为对于用户来说,推送 是实时的,而且咱们也无奈保障重连工夫是多久,所以让用户期待重连是不可行的,那么该怎么操作呢?这里我抉择的是内置一条缓存队列,用户 推送 数据,先进入缓存队列,再由缓存队列的消费者取出交由真正的队列,同时也要留神重连工夫过长导致内存溢出、服务端重启导致替换队列还清空。所以咱们设计的 channel 应该满足以下几点

  • 内置推送替换队列
  • 隔离生产
  • 提供重启机制

    • 重启channel
    • 重启 生产
    • 重启 推送
  • 提供优雅敞开(关机时候须要保障推送替换队列齐全推送结束)

代码案例


package go_amqp

import (
    "encoding/json"
    "errors"
    "fmt"
    "github.com/lazychanger/go-amqp/tools"
    "github.com/streadway/amqp"
    "log"
    "sync"
    "time"
)

type Queue struct {
    sync.Mutex
    // 队列名
    name string
    // 交换机名
    exchangeName string
    // 队列绑定交换机路由 KEY
    routingKey string
    // 交换机模式
    exchange string
    // 最大内置队列长度
    maxOverstock int64

    conn    *amqp.Connection
    channel *amqp.Channel

    // 1. 为什么不采纳 slice。因为 golang 的 slice 咱们不不便作为队列操作,每次入出都会引发大量的内存操作。所以采纳 map 类型
    // 2. 为什么采纳 sync.map。因为入与出是同时执行,异步操作,如果间接采纳 map 类型,会导致脏读、幻读等问题,咱们须要对 map 类型的读写须要锁,而 golang 有内置残缺的读写锁 map 模型
    // 3. 采纳 map 类型当前,须要咱们本人保护头出尾入以及长度,所以这里有 qMaxI、qMinI、ql 进行数据记录
    q *sync.Map
    // 最大 index
    qMaxI int64
    // 最小 index
    qMinI int64
    // 内置队列长度
    ql int64
    // 是否启动开释库存
    isReleaseStock bool
    // 是否重启生产
    isReloadConsume int
    // 是否进行生产
    isStopConsume bool
    // 是否筹备好
    already int

    close bool
    // 消费者
    cs []consume}

type MessageStatus int8

const (
    AlreadyStop    = 0
    AlreadyReload  = 1
    AlreadySucceed = 2

    MessageStatusSucceed MessageStatus = 1
    MessageStatusError   MessageStatus = 2
    MessageStatusRequeue MessageStatus = 3
)

// 提供重启服务
func (q *Queue) Reload(conn *amqp.Connection) error {
    // 先敞开状态
    q.conn = conn
    // 标识重启中
    q.already = AlreadyReload
    return q.init()}

// 初始化操作
func (q *Queue) init() error {
    var (
        ch  *amqp.Channel
        err error
    )
    log.Println("[amqp] queue init start")
    q.Lock()
    ch, err = q.conn.Channel()
    if err != nil {return err}

    // 创立交换机
    if err = ch.ExchangeDeclare(q.exchangeName, q.exchange, false, false, false, false, nil); err != nil {return errors.New(fmt.Sprintf("[amqp] exchange declare failed, err: %s", err))
    }
    // 创立队列
    if _, err = ch.QueueDeclare(q.name, false, false, false, false, nil); err != nil {return errors.New(fmt.Sprintf("[amqp] queue declare failed, err: %s", err))
    }
    // 交换机绑定队列
    if err = ch.QueueBind(q.name, q.routingKey, q.exchangeName, false, nil); err != nil {return errors.New(fmt.Sprintf("[amqp] queue bind failed, err: %s", err))
    }
    // 管道替换
    q.channel = ch
    // 告知曾经筹备结束
    if q.already == AlreadyReload {
        q.already = AlreadySucceed

        // 从新触发生产
        q.reloadConsume()

        if !q.isReleaseStock && q.ql > 0 {log.Println("init release stock")
            // 从新触发
            go q.releaseStock()}
    }

    q.Unlock()

    log.Println("[amqp] queue init end")

    return nil
}

// 对推送简略封装一下,使 json 对象推送更加简便
func (q *Queue) PublishJson(v interface{}) error {body, err := json.Marshal(v)
    if err != nil {return err}

    return q.Publish(body)
}

// 原始推送,要求[]byte
func (q *Queue) Publish(data []byte) error {
    // 查看库存,避免溢出
    if q.maxOverstock > 0 && q.ql > q.maxOverstock {return publishOverstock}

    if q.close {return errors.New("service closing")
    }

    // 启动锁,避免开释库存时候,独特操作导致脏写
    q.Lock()
    // 避免并发 map 创立
    if q.q == nil {q.q = &sync.Map{}
    }
    // 减少最大下标
    q.qMaxI++
    // 减少最大长度
    q.ql++
    // 存储值
    q.q.Store(q.qMaxI, data)
    q.Unlock()

    // 查看开释库存是否启动,未启动并且 channel 曾经筹备结束,就启动库存开释
    if !q.isReleaseStock && q.already == AlreadySucceed {go q.releaseStock()
    }

    log.Printf("published,now %d", q.ql)
    return nil
}

// 内置队列生产,库存开释
func (q *Queue) releaseStock() {
    // 判断是否反复启动
    q.Lock()
    if q.isReleaseStock {q.Unlock()
        log.Println("[amqp] release stock already run")
        return
    }
    // 标记服务曾经启动
    q.isReleaseStock = true
    q.Unlock()
    log.Println("[amqp] release stock")
    for {
        // 如果库存为空或者 channel 还未筹备好就敞开循环
        if q.ql == 0 || q.already != AlreadySucceed {break}
        // 先将以后长度取出,避免循环时候批改,变成脏读
        l := q.ql
        // 理论启动以后轮次库存开释
        for i := int64(0); i < l; i++ {
            if q.already != AlreadySucceed {break}
            // 对库存
            q.Lock()
            log.Printf("internal queues length: %d", q.ql)
            // 对库存最小下标进行 +1
            q.qMinI++
            // 缩小库存最大数
            q.ql--
            // 锁期间,顶部索引不变
            min := q.qMinI
            q.Unlock()

            // 读取内容
            body, has := q.q.Load(min)
            // 预防脏读
            if has && body != nil {
                // 推送
                _ = q.publish(body.([]byte))
            } else {log.Println("[amqp] data error")
            }
            // 开释 map 空间
            q.q.Delete(min)
        }

        // 本轮库存开释曾经完结,提早执行 3 秒后执行下一轮
        ticker := time.NewTicker(time.Second * 3)
        select {
        case <-ticker.C:
            ticker.Stop()}
    }
    // 标记敞开
    q.isReleaseStock = false
}

// 理论 channel 向队列发送数据
func (q *Queue) publish(data []byte) error {
    return q.channel.Publish(q.exchangeName, q.routingKey, false, false, amqp.Publishing{Body: data,})
}

// 增加生产内容,先存储,期待服务启动当前触发
func (q *Queue) Consume(name string, consumeFunc ConsumeFunc, repeat int) error {
    q.cs = append(q.cs, consume{repeat:      tools.IF(repeat <= 0, 1, repeat).(int),
        consumeFunc: consumeFunc,
        name:        name,
    })
    // 尝试启动生产
    q.reloadConsume()

    return nil
}

// 暂停生产
func (q *Queue) StopConsume() {q.isStopConsume = true}

// 启动生产
func (q *Queue) StartConsume() {
    q.isStopConsume = false
    q.reloadConsume()}

// 理论触发生产
func (q *Queue) reloadConsume() {
    // 如果未启动,间接返回
    if q.already != AlreadySucceed || q.isStopConsume {return}
    // 推送重启生产
    q.isReloadConsume++
    // 记录以后生产重启值
    reloadConsume := q.isReloadConsume
    // 记录以后 channel 重启值
    for i, c := range q.cs {
        // 并发生产
        for l := 0; l < c.repeat; l++ {name := fmt.Sprintf("%s_%d-%d", c.name, i, l)
            msgs, err := q.channel.Consume(q.name, name, false, false, false, false, nil)
            if err != nil {log.Fatalf("[AMQP] customer register err;name: %s, %s", name, err)
            } else {go func(c ConsumeFunc, consumeName string, reloadConsume int) {
                    for msg := range msgs {switch c(msg.Body, consumeName) {
                        case MessageStatusSucceed:
                        case MessageStatusError:
                            _ = msg.Ack(true)
                            break
                        case MessageStatusRequeue:
                            _ = msg.Reject(true)
                            break
                        }
                        // 如果 channel 重启或者生产重启,都完结以后生产,避免溢出,或者正在敞开
                        if q.already != AlreadySucceed || q.isReloadConsume != reloadConsume || q.close || q.isStopConsume {break}
                    }
                }(c.consumeFunc, name, reloadConsume)
            }
        }
    }
}

// 优雅重启
func (q *Queue) Close() error {
    // 先标记敞开
    q.close = true

    retry := 0

    for {

        if q.ql > 0 {
            if q.already == AlreadySucceed {
                if q.isReleaseStock == false {q.releaseStock()
                }
            } else {
                retry++
                // 如果 channel 没有筹备好,内置队列也没有开释完,则重试三次,三次还没有解决好,就放弃重试
                if retry > 3 {break}
            }
        } else {break}

        ticker := time.NewTicker(time.Second / 2)
        select {
        case <-ticker.C:
            ticker.Stop()}
    }

    return q.channel.Close()}

type consume struct {
    name        string
    consumeFunc ConsumeFunc
    repeat      int
}
type ConsumeFunc func(data []byte, name string) MessageStatus

channel 治理

思考

下面介绍了单个 channel 外部状态保护,那么当初就要开始对这些 channel 进行治理。治理内容如下:

  • channel注册
  • 断线重启
  • 优雅敞开

代码

package go_amqp

import (
    "github.com/streadway/amqp"
    "log"
    "sync"
    "time"
)

type Connection struct {
    // 配置
    config *config
    // amqp 连贯
    conn *amqp.Connection
    // 连贯状态
    isConnected bool
    // 关机提醒
    done chan bool
    // amqp 闪断告诉
    notifyClose chan *amqp.Error
    // 多队列(channel),此处认为一个 channel 治理一条队列
    qs []*Queue}


type config struct {
    // 连贯 AMQP DSN 构建驱动
    driver Driver

    // 最大音讯重发次数
    maxSendRetries int
    // 最大重连次数
    maxReconnects int
    // 重连延迟时间
    reconnectDelay time.Duration

    // 最大发送积压数
    maxOverstock int64
}


func New(opts ...Option) (*Connection, error) {
    // 建设默认连贯对象
    conn := &Connection{
        // 生成默认配置
        config: &config{
            reconnectDelay: time.Second,
            maxReconnects:  0,
            maxSendRetries: 3,
        },
    }

    for _, opt := range opts {
        // 配置写入
        opt(conn.config)
    }

    // 根底必须配置查看
    if conn.config.driver == nil {return nil, missingConnectionDsnDriver}

    // 断线重连
    go conn.handleReconnect()

    return conn, nil
}

func (c *Connection) handleReconnect() {
    tryConnects := 0
    for {
        c.isConnected = false
        if err := c.connect(); err != nil {
            if c.config.maxReconnects > 0 && tryConnects > c.config.maxReconnects {log.Fatalf("[AMQP] Reconnection times exceeded!(%d)", tryConnects)
                return
            }

            tryConnects += 1
            log.Printf("Failed to connect. %s Retrying...(%d)", err, tryConnects)
            time.Sleep(c.config.reconnectDelay)
        } else {
            // clear try connect
            tryConnects = 0
        }
        // 期待下一步信号告诉
        select {
        case <-c.done:
            return
        case <-c.notifyClose:
        }
    }
}

//
func (c *Connection) connect() error {url := c.config.driver.Url()
    log.Printf("[amqp] connected. %s", url)
    conn, err := amqp.Dial(url)
    if err != nil {return err}
    c.conn = conn

    c.notifyClose = make(chan *amqp.Error)
    c.conn.NotifyClose(c.notifyClose)

    c.isConnected = true
    // channel 重连
    c.queueReconnect()
    return nil
}

// channel 重连
// 当连贯闪断当前,须要从新建设新的连贯,所以,所有的 channel 也须要进行新的连贯
func (c *Connection) queueReconnect() {
    for _, q := range c.qs {if err := q.Reload(c.conn); err != nil {log.Println(err)
        }
    }
}

// 优雅敞开,// 敞开 channel
// 敞开闪断重连机制
// 敞开 connection
func (c *Connection) Close() error {
    if c.isConnected {return alreadyClosed}

    var wg sync.WaitGroup
    // 批量敞开
    for i := 0; i < len(c.qs); i++ {wg.Add(1)
        go func(idx int, group *sync.WaitGroup) {_ = c.qs[idx].Close()
            group.Done()}(i, &wg)
    }

    wg.Wait()

    _ = c.conn.Close()

    // 敞开
    close(c.done)
    return nil
}

// 生成单 Channel 治理,让一个 channel 治理一条队列的发送与生产
func (c *Connection) Queue(name, exchange, routingKey string) (*Queue, error) {
    q := &Queue{
        name:         name,
        exchange:     amqp.ExchangeDirect,
        exchangeName: exchange,
        routingKey:   routingKey,
        maxOverstock: c.config.maxOverstock,
    }

    if c.isConnected == true {if err := q.Reload(c.conn); err != nil {return nil, err}
    }
    c.qs = append(c.qs, q)
    return q, nil
}

刚开始写一些技术分享的,很多中央或者构造可能写的比拟毛糙,欢送各位执教、交换。如果有不理解或者新的想法,也能够评论留言沟通。thanks!

正文完
 0