关于golang:简单设计goamqp

仓库地址

go get -u github.com/lazychanger/go-amqp

注意事项

  • rabbitmq将连贯与管道离开,即connectionchannelconnection是实体连贯,而channel是逻辑连贯。所以当咱们多线程应用时候,应该是单connection+多channel
  • connection闪断当前,channel也会生效。所以重连当前,还须要从新建设channel
  • 所有的推送生产都是基于channel,所以channel重连当前,还须要从新开启推送生产
  • connection闪断当前。推送是有效的,这个时候再去推送,必定会报错。所以咱们重连机制,必须要保障,connection在闪断时候,也要保障推送接口失常

channel治理

思考

咱们先定义,channel到底代表什么?我认为单个channel操作单条队列的发送与生产,并且须要对主服务提供重启。既然要提供重启,那就要保障单个channel对象的生产与推送,不容许间接与channel操作,须要进行操作隔离,避免闪断与重启期间,操作有效的channel。要实现操作隔离,隔离生产比较简单,收集好相干操作的handle,重启当前,从新接入handle即可。然而隔离推送比拟艰难,因为对于用户来说,推送是实时的,而且咱们也无奈保障重连工夫是多久,所以让用户期待重连是不可行的,那么该怎么操作呢?这里我抉择的是内置一条缓存队列,用户推送数据,先进入缓存队列,再由缓存队列的消费者取出交由真正的队列,同时也要留神重连工夫过长导致内存溢出、服务端重启导致替换队列还清空。所以咱们设计的channel应该满足以下几点

  • 内置推送替换队列
  • 隔离生产
  • 提供重启机制

    • 重启channel
    • 重启生产
    • 重启推送
  • 提供优雅敞开(关机时候须要保障推送替换队列齐全推送结束)

代码案例


package go_amqp

import (
    "encoding/json"
    "errors"
    "fmt"
    "github.com/lazychanger/go-amqp/tools"
    "github.com/streadway/amqp"
    "log"
    "sync"
    "time"
)

type Queue struct {
    sync.Mutex
    // 队列名
    name string
    // 交换机名
    exchangeName string
    // 队列绑定交换机路由KEY
    routingKey string
    // 交换机模式
    exchange string
    // 最大内置队列长度
    maxOverstock int64

    conn    *amqp.Connection
    channel *amqp.Channel

    // 1.为什么不采纳slice。因为golang的slice咱们不不便作为队列操作,每次入出都会引发大量的内存操作。所以采纳 map 类型
    // 2.为什么采纳sync.map。因为入与出是同时执行,异步操作,如果间接采纳map类型,会导致脏读、幻读等问题,咱们须要对map类型的读写须要锁,而golang有内置残缺的读写锁map模型
    // 3.采纳map类型当前,须要咱们本人保护头出尾入以及长度,所以这里有qMaxI、qMinI、ql进行数据记录
    q *sync.Map
    // 最大index
    qMaxI int64
    // 最小index
    qMinI int64
    // 内置队列长度
    ql int64
    // 是否启动开释库存
    isReleaseStock bool
    // 是否重启生产
    isReloadConsume int
    // 是否进行生产
    isStopConsume bool
    // 是否筹备好
    already int

    close bool
    // 消费者
    cs []consume
}

type MessageStatus int8

const (
    AlreadyStop    = 0
    AlreadyReload  = 1
    AlreadySucceed = 2

    MessageStatusSucceed MessageStatus = 1
    MessageStatusError   MessageStatus = 2
    MessageStatusRequeue MessageStatus = 3
)

// 提供重启服务
func (q *Queue) Reload(conn *amqp.Connection) error {
    // 先敞开状态
    q.conn = conn
    // 标识重启中
    q.already = AlreadyReload
    return q.init()
}

// 初始化操作
func (q *Queue) init() error {
    var (
        ch  *amqp.Channel
        err error
    )
    log.Println("[amqp] queue init start")
    q.Lock()
    ch, err = q.conn.Channel()
    if err != nil {
        return err
    }

    // 创立交换机
    if err = ch.ExchangeDeclare(q.exchangeName, q.exchange, false, false, false, false, nil); err != nil {
        return errors.New(fmt.Sprintf("[amqp] exchange declare failed, err: %s", err))
    }
    // 创立队列
    if _, err = ch.QueueDeclare(q.name, false, false, false, false, nil); err != nil {
        return errors.New(fmt.Sprintf("[amqp] queue declare failed, err: %s", err))
    }
    // 交换机绑定队列
    if err = ch.QueueBind(q.name, q.routingKey, q.exchangeName, false, nil); err != nil {
        return errors.New(fmt.Sprintf("[amqp] queue bind failed, err: %s", err))
    }
    // 管道替换
    q.channel = ch
    // 告知曾经筹备结束
    if q.already == AlreadyReload {
        q.already = AlreadySucceed

        // 从新触发生产
        q.reloadConsume()

        if !q.isReleaseStock && q.ql > 0 {
            log.Println("init release stock")
            // 从新触发
            go q.releaseStock()
        }
    }

    q.Unlock()

    log.Println("[amqp] queue init end")

    return nil
}

// 对推送简略封装一下,使json对象推送更加简便
func (q *Queue) PublishJson(v interface{}) error {
    body, err := json.Marshal(v)
    if err != nil {
        return err
    }

    return q.Publish(body)
}

// 原始推送,要求[]byte
func (q *Queue) Publish(data []byte) error {
    // 查看库存,避免溢出
    if q.maxOverstock > 0 && q.ql > q.maxOverstock {
        return publishOverstock
    }

    if q.close {
        return errors.New("service closing")
    }

    // 启动锁,避免开释库存时候,独特操作导致脏写
    q.Lock()
    // 避免并发map创立
    if q.q == nil {
        q.q = &sync.Map{}
    }
    // 减少最大下标
    q.qMaxI++
    // 减少最大长度
    q.ql++
    // 存储值
    q.q.Store(q.qMaxI, data)
    q.Unlock()

    // 查看开释库存是否启动,未启动并且channel曾经筹备结束,就启动库存开释
    if !q.isReleaseStock && q.already == AlreadySucceed {
        go q.releaseStock()
    }

    log.Printf("published,now %d", q.ql)
    return nil
}

// 内置队列生产,库存开释
func (q *Queue) releaseStock() {
    // 判断是否反复启动
    q.Lock()
    if q.isReleaseStock {
        q.Unlock()
        log.Println("[amqp] release stock already run")
        return
    }
    // 标记服务曾经启动
    q.isReleaseStock = true
    q.Unlock()
    log.Println("[amqp] release stock")
    for {
        // 如果库存为空或者channel还未筹备好就敞开循环
        if q.ql == 0 || q.already != AlreadySucceed {
            break
        }
        // 先将以后长度取出,避免循环时候批改,变成脏读
        l := q.ql
        // 理论启动以后轮次库存开释
        for i := int64(0); i < l; i++ {
            if q.already != AlreadySucceed {
                break
            }
            // 对库存
            q.Lock()
            log.Printf("internal queues length: %d", q.ql)
            // 对库存最小下标进行+1
            q.qMinI++
            // 缩小库存最大数
            q.ql--
            // 锁期间,顶部索引不变
            min := q.qMinI
            q.Unlock()

            // 读取内容
            body, has := q.q.Load(min)
            // 预防脏读
            if has && body != nil {
                // 推送
                _ = q.publish(body.([]byte))
            } else {
                log.Println("[amqp] data error")
            }
            // 开释map空间
            q.q.Delete(min)
        }

        // 本轮库存开释曾经完结,提早执行3秒后执行下一轮
        ticker := time.NewTicker(time.Second * 3)
        select {
        case <-ticker.C:
            ticker.Stop()
        }
    }
    // 标记敞开
    q.isReleaseStock = false
}

// 理论channel向队列发送数据
func (q *Queue) publish(data []byte) error {
    return q.channel.Publish(q.exchangeName, q.routingKey, false, false, amqp.Publishing{
        Body: data,
    })
}

// 增加生产内容,先存储,期待服务启动当前触发
func (q *Queue) Consume(name string, consumeFunc ConsumeFunc, repeat int) error {
    q.cs = append(q.cs, consume{
        repeat:      tools.IF(repeat <= 0, 1, repeat).(int),
        consumeFunc: consumeFunc,
        name:        name,
    })
    // 尝试启动生产
    q.reloadConsume()

    return nil
}

// 暂停生产
func (q *Queue) StopConsume() {
    q.isStopConsume = true
}

// 启动生产
func (q *Queue) StartConsume() {
    q.isStopConsume = false
    q.reloadConsume()
}

// 理论触发生产
func (q *Queue) reloadConsume() {
    // 如果未启动,间接返回
    if q.already != AlreadySucceed || q.isStopConsume {
        return
    }
    // 推送重启生产
    q.isReloadConsume++
    // 记录以后生产重启值
    reloadConsume := q.isReloadConsume
    // 记录以后channel重启值
    for i, c := range q.cs {
        // 并发生产
        for l := 0; l < c.repeat; l++ {
            name := fmt.Sprintf("%s_%d-%d", c.name, i, l)
            msgs, err := q.channel.Consume(q.name, name, false, false, false, false, nil)
            if err != nil {
                log.Fatalf("[AMQP] customer register err;name: %s, %s", name, err)
            } else {
                go func(c ConsumeFunc, consumeName string, reloadConsume int) {
                    for msg := range msgs {
                        switch c(msg.Body, consumeName) {
                        case MessageStatusSucceed:
                        case MessageStatusError:
                            _ = msg.Ack(true)
                            break
                        case MessageStatusRequeue:
                            _ = msg.Reject(true)
                            break
                        }
                        // 如果channel重启或者生产重启,都完结以后生产,避免溢出,或者正在敞开
                        if q.already != AlreadySucceed || q.isReloadConsume != reloadConsume || q.close || q.isStopConsume {
                            break
                        }
                    }
                }(c.consumeFunc, name, reloadConsume)
            }
        }
    }
}

// 优雅重启
func (q *Queue) Close() error {
    // 先标记敞开
    q.close = true

    retry := 0

    for {

        if q.ql > 0 {
            if q.already == AlreadySucceed {
                if q.isReleaseStock == false {
                    q.releaseStock()
                }
            } else {
                retry++
                // 如果channel没有筹备好,内置队列也没有开释完,则重试三次,三次还没有解决好,就放弃重试
                if retry > 3 {
                    break
                }
            }
        } else {
            break
        }

        ticker := time.NewTicker(time.Second / 2)
        select {
        case <-ticker.C:
            ticker.Stop()
        }
    }

    return q.channel.Close()
}

type consume struct {
    name        string
    consumeFunc ConsumeFunc
    repeat      int
}
type ConsumeFunc func(data []byte, name string) MessageStatus

channel治理

思考

下面介绍了单个channel外部状态保护,那么当初就要开始对这些channel进行治理。治理内容如下:

  • channel注册
  • 断线重启
  • 优雅敞开

代码

package go_amqp

import (
    "github.com/streadway/amqp"
    "log"
    "sync"
    "time"
)

type Connection struct {
    // 配置
    config *config
    // amqp连贯
    conn *amqp.Connection
    // 连贯状态
    isConnected bool
    // 关机提醒
    done chan bool
    // amqp闪断告诉
    notifyClose chan *amqp.Error
    // 多队列(channel),此处认为一个channel治理一条队列
    qs []*Queue
}


type config struct {
    // 连贯AMQP DSN构建驱动
    driver Driver

    // 最大音讯重发次数
    maxSendRetries int
    // 最大重连次数
    maxReconnects int
    // 重连延迟时间
    reconnectDelay time.Duration

    // 最大发送积压数
    maxOverstock int64
}


func New(opts ...Option) (*Connection, error) {
    // 建设默认连贯对象
    conn := &Connection{
        // 生成默认配置
        config: &config{
            reconnectDelay: time.Second,
            maxReconnects:  0,
            maxSendRetries: 3,
        },
    }

    for _, opt := range opts {
        // 配置写入
        opt(conn.config)
    }

    // 根底必须配置查看
    if conn.config.driver == nil {
        return nil, missingConnectionDsnDriver
    }

    // 断线重连
    go conn.handleReconnect()

    return conn, nil
}

func (c *Connection) handleReconnect() {
    tryConnects := 0
    for {
        c.isConnected = false
        if err := c.connect(); err != nil {
            if c.config.maxReconnects > 0 && tryConnects > c.config.maxReconnects {
                log.Fatalf("[AMQP] Reconnection times exceeded!(%d)", tryConnects)
                return
            }

            tryConnects += 1
            log.Printf("Failed to connect. %s Retrying...(%d)", err, tryConnects)
            time.Sleep(c.config.reconnectDelay)
        } else {
            // clear try connect
            tryConnects = 0
        }
        // 期待下一步信号告诉
        select {
        case <-c.done:
            return
        case <-c.notifyClose:
        }
    }
}

//
func (c *Connection) connect() error {
    url := c.config.driver.Url()
    log.Printf("[amqp] connected. %s", url)
    conn, err := amqp.Dial(url)
    if err != nil {
        return err
    }
    c.conn = conn

    c.notifyClose = make(chan *amqp.Error)
    c.conn.NotifyClose(c.notifyClose)

    c.isConnected = true
    // channel重连
    c.queueReconnect()
    return nil
}

// channel重连
// 当连贯闪断当前,须要从新建设新的连贯,所以,所有的channel也须要进行新的连贯
func (c *Connection) queueReconnect() {
    for _, q := range c.qs {
        if err := q.Reload(c.conn); err != nil {
            log.Println(err)
        }
    }
}

// 优雅敞开,
// 敞开channel
// 敞开闪断重连机制
// 敞开connection
func (c *Connection) Close() error {
    if c.isConnected {
        return alreadyClosed
    }

    var wg sync.WaitGroup
    // 批量敞开
    for i := 0; i < len(c.qs); i++ {
        wg.Add(1)
        go func(idx int, group *sync.WaitGroup) {
            _ = c.qs[idx].Close()
            group.Done()
        }(i, &wg)
    }

    wg.Wait()

    _ = c.conn.Close()

    // 敞开
    close(c.done)
    return nil
}

// 生成单Channel治理,让一个channel治理一条队列的发送与生产
func (c *Connection) Queue(name, exchange, routingKey string) (*Queue, error) {
    q := &Queue{
        name:         name,
        exchange:     amqp.ExchangeDirect,
        exchangeName: exchange,
        routingKey:   routingKey,
        maxOverstock: c.config.maxOverstock,
    }

    if c.isConnected == true {
        if err := q.Reload(c.conn); err != nil {
            return nil, err
        }
    }
    c.qs = append(c.qs, q)
    return q, nil
}

刚开始写一些技术分享的,很多中央或者构造可能写的比拟毛糙,欢送各位执教、交换。如果有不理解或者新的想法,也能够评论留言沟通。thanks!

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理