关于rabbitmq:RabbitMQ基础客户端开发

21次阅读

共计 5802 个字符,预计需要花费 15 分钟才能阅读完成。

连贯 RabbitMQ

建设连贯

func Dial(url string) (*Connection, error)
  • url:RabbitMQ 服务端 url,例如:amqp://admin:admin@127.0.0.1:5672/

创立信道

func (c *Connection) Channel() (*Channel, error)

Connection 能够用来创立多个 Channel 实例,然而 Channel 实例不能在线程间共享,应用程序应该为每一个线程开拓一个 Channel。某些状况下 Channel 的操作能够并发运行,然而在其余状况下会导致通信谬误,同时也会影响发送方确认(publisher confirm)机制的运行,所以多线程间共享 Channel 实例是非线程平安的。

应用交换器和队列

申明交换器

func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error
  • name:交换器的名称。
  • kind:交换器的类型,常见的如 fanout、direct、topic。
  • durable:设置是否长久化。设置为长久化能够将交换器存盘,在服务器重启的时候不会失落相干信息。
  • autoDelete:设置是否主动删除。 主动删除的前提是至多有一个队列或者交换器与这个交换器绑定,当所有与这个交换器绑定的队列或者交换器都与此解绑时,会主动删除该交换器。 留神不能谬误地把这个参数了解为“当与此交换器连贯的客户端都断开时,RabbitMQ 会主动删除本交换器”。
  • internal:设置是否是内置交换器。 如果设置为 true,则示意是内置的交换器,客户端程序无奈间接发送音讯到这个交换器中,只能通过交换器路由到交换器这种形式。
  • noWait:是否非阻塞期待服务器返回。 设置为 true 则不会阻塞期待 RabbitMQ Server 的返回信息(实际上服务器也不会返回),此时立刻应用这个交换器可能会导致异样,倡议设置为 false。
  • args:其余一些结构化参数,比方 alternate-exchange 等。

还有另一个相似的参数齐全一样的办法 ExchangeDeclarePassive,这个办法次要用来检测相应的交换器是否存在,如果存在则失常返回,不存在则抛出异样,同时 Channel 也会被敞开。

删除交换器

func (ch *Channel) ExchangeDelete(name string, ifUnused, noWait bool) error
  • name:交换器名称。
  • ifUnused:设置是否在交换器没有被应用的状况下删除。如果设置为 true,则只有在此交换器没有被应用的状况下才会被删除;如果设置为 false,则无论如何这个交换器都要被删除。
  • noWait:是否非阻塞期待服务器返回,倡议设置为 false。

申明队列

func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
  • name:队列的名称。
  • durable:设置是否长久化。长久化的队列会存盘,在服务器重启的时候能够保障不失落队列及音讯(不失落是绝对的,如果宕机时有音讯没来得及存盘,还是会失落)。
  • autoDelete:设置是否主动删除。 主动删除的前提是至多有一个消费者连贯到这个队列,之后所有与这个队列连贯的消费者都断开时,才会主动删除。 不能把这个参数谬误地了解为“当连贯到此队列的所有客户端断开时,这个队列会主动删除”,因为生产者客户端创立这个队列,或者没有消费者客户端与这个队列连贯时,都不会主动删除这个队列。
  • exclusive:设置是否排他。 如果一个队列被申明为排他队列,该队列仅对首次申明它的连贯可见,并在连贯断开时主动删除。 有几点须要留神:排他队列是基于连贯可见的,同一个连贯的不同信道是能够同时拜访同一连贯创立的排他队列;如果一个连贯曾经申明了一个排他队列,其余连贯不容许建设同名的排他队列;即便该队列是长久化的,一旦连贯敞开或者客户端退出,该排他队列都会被主动删除。这种队列实用于一个客户端同时发送和读取音讯的利用场景。
  • noWait:是否非阻塞期待服务器返回,倡议设置为 false。
  • args:设置队列的其余一些参数,如 x -message-ttl、x-expires 等。

生产者和消费者都可能应用 queueDeclare 来申明一个队列,然而如果消费者在同一个信道上订阅了另一个队列,就无奈再申明队列了。必须先勾销订阅,而后将信道置为“传输”模式,之后能力申明队列。

同样这里还有一个参数齐全一样的 queueDeclarePassive 办法,这个办法用来检测相应的队列是否存在。如果存在则失常返回,如果不存在则抛出异样,同时 Channel 也会被敞开。

删除队列

func (ch *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)
  • name:队列名称。
  • ifUnused:设置是否在队列没有被应用的状况下删除。
  • ifEmpty:设置是否在队列为空(没有任何音讯沉积)的状况下才可能删除。
  • noWait:是否非阻塞期待服务器返回,倡议设置为 false。

清空队列

func (ch *Channel) QueuePurge(name string, noWait bool) (int, error)
  • name:队列名称。
  • noWait:是否非阻塞期待服务器返回,倡议设置为 false。

队列绑定

func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error
  • name:队列名称。
  • key:用来绑定队列和交换器的键。
  • exchange:交换器名称。
  • noWait:是否非阻塞期待服务器返回,倡议设置为 false。
  • args:定义绑定的一些参数。

队列解绑

func (ch *Channel) QueueUnbind(name, key, exchange string, args Table) error
  • name:队列名称。
  • key:用来绑定队列和交换器的键。
  • exchange:交换器名称。
  • args:定义解绑的一些参数。

交换器绑定

func (ch *Channel) ExchangeBind(destination, key, source string, noWait bool, args Table) error
  • destination:目标交换器,通常是外部交换器。
  • key:用来绑定源交换器和目标交换器的键。
  • source:源交换器。
  • nowait:是否非阻塞期待服务器返回,倡议设置为 false。
  • args:定义绑定的一些参数。

生产者发送音讯至源交换器中,源交换器依据路由键找到与其匹配的目标交换器,并将音讯转发到给目标交换器,进而存储在目标交换器绑定的队列中。

发送音讯

func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
  • exchange:交换机名称,指明音讯须要发送到哪个交换器中。如果设置为空字符串,则音讯会被发送到默认交换器中。
  • key:路由键,交换器依据路由键将音讯存储到相应的队列中。
  • mandatory:倡议为 false,前面有专门章节解说。
  • immediate:倡议为 false,前面有专门章节解说。
  • msg:要发送的音讯,msg 对应一个 Publishing 构造,Publishing 构造外面有很多属性,除了正文的几个之外,大多很少应用。
type Publishing struct {
    // headers 类型交换器会应用
    Headers Table

        // 属性
    ContentType     string    // 音讯类型
    ContentEncoding string    // 音讯编码
    DeliveryMode    uint8     // 是否长久化,0 或 1 非长久化,2 长久化
    Priority        uint8     // 优先级,0 - 9
    CorrelationId   string    // 关联 id, 有助于将 RPC 响应与申请相关联
    ReplyTo         string    // RPC 响应的回调地址, 罕用于命名回调队列
    Expiration      string    
    MessageId       string    
    Timestamp       time.Time 
    Type            string    
    UserId          string    
    AppId           string    

    // 音讯体
    Body []byte}

生产音讯

RabbitMQ 的生产模式分两种:推(Push)模式和拉(Pull)模式。

推模式

在推模式中,能够通过继续订阅的形式来生产音讯。

func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
  • queue:队列名称。
  • consumer:消费者标签,用于辨别不同的消费者。
  • autoAck:设置是否主动确认,倡议设置成 false。
  • exclusive:设置是否排他,排他示意以后队列只能给一个消费者应用。
  • noLocal:设置为 true 则示意不能将同一个 Connection 中生产者发送的音讯传送给这 Connection 中的消费者。
  • nowait:是否非阻塞期待服务器返回,倡议设置为 false。
  • args:设置消费者的其余参数。

此函数返回一个单向 Delivery 类型通道,遍历该通道,有音讯则进行解决,没有则阻塞。如果须要勾销订阅的话,能够调用 Cancel 办法:

func (ch *Channel) Cancel(consumer string, noWait bool) error
  • consumer:消费者标签,用于辨别不同的消费者。
  • nowait:是否非阻塞期待服务器返回,倡议设置为 false。

拉模式

func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error)
  • queue:队列名称。
  • autoAck:设置是否主动确认,倡议设置成 false。

推模式下将信道置为接管模式,直到勾销队列的订阅为止。在接管模式期间,RabbitMQ 会一直地推送音讯给消费者(当然推送音讯的个数还是会受到 Qos 的限度)。 如果只想从队列取得单条音讯而不是继续订阅,倡议还是应用拉模式。然而不能将 Get 办法放在一个循环里来代替推模式,这样做会重大影响 RabbitMQ 的性能。如果要实现高吞吐,消费者理当应用推模式。

生产端的确认与回绝

为了保障音讯从队列牢靠地达到消费者,RabbitMQ 提供了音讯确认机制。消费者在订阅队列时,能够指定 autoAck 参数,当 autoAck 等于 false 时,RabbitMQ 会期待消费者显式地回复确认信号后才删除音讯。当 autoAck 等于 true 时,RabbitMQ 会主动把发送进来的音讯置为确认,而后删除,而不论消费者是否真正地生产到了这些音讯。

当 autoAck 参数置为 false,对于 RabbitMQ 服务端而言,队列中的音讯分成了两个局部:一部分是期待投递给消费者的音讯,另一部分是曾经投递给消费者,然而还没有收到消费者确认信号的音讯。 如果 RabbitMQ 始终没有收到消费者的确认信号,并且生产此音讯的消费者曾经断开连接,则 RabbitMQ 会安顿该音讯从新进入队列,期待投递给下一个消费者(当然也有可能还是原来的那个消费者)。

RabbitMQ 不会为未确认的音讯设置过期工夫,它判断此音讯是否须要从新投递给消费者的惟一根据是生产该音讯的消费者连贯是否曾经断开。

确认音讯

func (ch *Channel) Ack(tag uint64, multiple bool) error
func (d Delivery) Ack(multiple bool) error
  • tag:能够看作是音讯的编号。
  • multiple:设置是否批量确认。true 示意确认编号为 tag 之前所有未被以后消费者确认的音讯,false 示意仅确认编号为 tag 的音讯。

回绝音讯

func (ch *Channel) Reject(tag uint64, requeue bool) error
func (d Delivery) Reject(requeue bool) error 
  • tag:能够看作是音讯的编号。
  • requeue:设置是否将音讯重新加入队列。如果设置为 true,则 RabbitMQ 会从新将这条音讯存入队列,以便能够发送给下一个订阅的消费者;如果设置为 false,则立刻会将音讯从队列中移除。

批量回绝音讯

func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error
func (d Delivery) Nack(multiple, requeue bool) error
  • tag:能够看作是音讯的编号。
  • multiple:设置是否批量回绝。
  • requeue:设置是否将音讯重新加入队列。

注:将 Reject 或者 Nack 办法中的 requeue 设置为 false,能够启用“死信队列”的性能。死信队列能够通过检测被回绝或者未送达的音讯来追踪问题。

复原音讯

func (ch *Channel) Recover(requeue bool) error
  • requeue:设置是否将音讯重新加入队列。

此办法用来申请 RabbitMQ 从新发送还未被确认的音讯。 如果 requeue 参数设置为 true,则未被确认的音讯会被重新加入到队列中,这样对于同一条音讯来说,可能会被调配给与之前不同的消费者。如果 requeue 参数设置为 false,那么同一条音讯会被调配给与之前雷同的消费者。

注:Delivery 的确认和回绝相干办法实际上都调用了 Channel 的同名办法,个别状况下举荐应用 Delivery 的相干办法。

敞开连贯

func (ch *Channel) Close() error
func (c *Connection) Close() error

显式地敞开 Channel 是个好习惯,但不是必须的,在 Connection 敞开的时候,Channel 也会主动敞开。

正文完
 0