连贯RabbitMQ

建设连贯

func Dial(url string) (*Connection, error)
  • url:RabbitMQ服务端url,例如:amqp://admin:admin@127.0.0.1:5672/

创立信道

func (c *Connection) Channel() (*Channel, error)

Connection能够用来创立多个Channel实例,然而Channel实例不能在线程间共享,应用程序应该为每一个线程开拓一个Channel。某些状况下Channel的操作能够并发运行,然而在其余状况下会导致通信谬误,同时也会影响发送方确认(publisher confirm)机制的运行,所以多线程间共享Channel实例是非线程平安的。

应用交换器和队列

申明交换器

func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error
  • name:交换器的名称。
  • kind:交换器的类型,常见的如 fanout、direct、topic。
  • durable:设置是否长久化。设置为长久化能够将交换器存盘,在服务器重启的时候不会失落相干信息。
  • autoDelete:设置是否主动删除。主动删除的前提是至多有一个队列或者交换器与这个交换器绑定,当所有与这个交换器绑定的队列或者交换器都与此解绑时,会主动删除该交换器。留神不能谬误地把这个参数了解为“当与此交换器连贯的客户端都断开时,RabbitMQ会主动删除本交换器”。
  • internal:设置是否是内置交换器。如果设置为true,则示意是内置的交换器,客户端程序无奈间接发送音讯到这个交换器中,只能通过交换器路由到交换器这种形式。
  • noWait:是否非阻塞期待服务器返回。设置为true则不会阻塞期待RabbitMQ Server的返回信息(实际上服务器也不会返回),此时立刻应用这个交换器可能会导致异样,倡议设置为false。
  • args:其余一些结构化参数,比方alternate-exchange等。

还有另一个相似的参数齐全一样的办法ExchangeDeclarePassive,这个办法次要用来检测相应的交换器是否存在,如果存在则失常返回,不存在则抛出异样,同时Channel也会被敞开。

删除交换器

func (ch *Channel) ExchangeDelete(name string, ifUnused, noWait bool) error
  • name:交换器名称。
  • ifUnused:设置是否在交换器没有被应用的状况下删除。如果设置为true,则只有在此交换器没有被应用的状况下才会被删除;如果设置为false,则无论如何这个交换器都要被删除。
  • noWait:是否非阻塞期待服务器返回,倡议设置为false。

申明队列

func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
  • name:队列的名称。
  • durable:设置是否长久化。长久化的队列会存盘,在服务器重启的时候能够保障不失落队列及音讯(不失落是绝对的,如果宕机时有音讯没来得及存盘,还是会失落)。
  • autoDelete:设置是否主动删除。主动删除的前提是至多有一个消费者连贯到这个队列,之后所有与这个队列连贯的消费者都断开时,才会主动删除。不能把这个参数谬误地了解为“当连贯到此队列的所有客户端断开时,这个队列会主动删除”,因为生产者客户端创立这个队列,或者没有消费者客户端与这个队列连贯时,都不会主动删除这个队列。
  • exclusive:设置是否排他。如果一个队列被申明为排他队列,该队列仅对首次申明它的连贯可见,并在连贯断开时主动删除。有几点须要留神:排他队列是基于连贯可见的,同一个连贯的不同信道是能够同时拜访同一连贯创立的排他队列;如果一个连贯曾经申明了一个排他队列,其余连贯不容许建设同名的排他队列;即便该队列是长久化的,一旦连贯敞开或者客户端退出,该排他队列都会被主动删除。这种队列实用于一个客户端同时发送和读取音讯的利用场景。
  • noWait:是否非阻塞期待服务器返回,倡议设置为false。
  • args:设置队列的其余一些参数,如x-message-ttl、x-expires等。

生产者和消费者都可能应用queueDeclare来申明一个队列,然而如果消费者在同一个信道上订阅了另一个队列,就无奈再申明队列了。必须先勾销订阅,而后将信道置为“传输”模式,之后能力申明队列。

同样这里还有一个参数齐全一样的queueDeclarePassive办法,这个办法用来检测相应的队列是否存在。如果存在则失常返回,如果不存在则抛出异样,同时Channel也会被敞开。

删除队列

func (ch *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)
  • name:队列名称。
  • ifUnused:设置是否在队列没有被应用的状况下删除。
  • ifEmpty:设置是否在队列为空(没有任何音讯沉积)的状况下才可能删除。
  • noWait:是否非阻塞期待服务器返回,倡议设置为false。

清空队列

func (ch *Channel) QueuePurge(name string, noWait bool) (int, error)
  • name:队列名称。
  • noWait:是否非阻塞期待服务器返回,倡议设置为false。

队列绑定

func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error
  • name:队列名称。
  • key:用来绑定队列和交换器的键。
  • exchange:交换器名称。
  • noWait:是否非阻塞期待服务器返回,倡议设置为false。
  • args:定义绑定的一些参数。

队列解绑

func (ch *Channel) QueueUnbind(name, key, exchange string, args Table) error
  • name:队列名称。
  • key:用来绑定队列和交换器的键。
  • exchange:交换器名称。
  • args:定义解绑的一些参数。

交换器绑定

func (ch *Channel) ExchangeBind(destination, key, source string, noWait bool, args Table) error
  • destination:目标交换器,通常是外部交换器。
  • key:用来绑定源交换器和目标交换器的键。
  • source:源交换器。
  • nowait:是否非阻塞期待服务器返回,倡议设置为false。
  • args:定义绑定的一些参数。

生产者发送音讯至源交换器中,源交换器依据路由键找到与其匹配的目标交换器,并将音讯转发到给目标交换器,进而存储在目标交换器绑定的队列中。

发送音讯

func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
  • exchange:交换机名称,指明音讯须要发送到哪个交换器中。如果设置为空字符串,则音讯会被发送到默认交换器中。
  • key:路由键,交换器依据路由键将音讯存储到相应的队列中。
  • mandatory:倡议为false,前面有专门章节解说。
  • immediate :倡议为false,前面有专门章节解说。
  • msg:要发送的音讯,msg对应一个Publishing构造,Publishing构造外面有很多属性,除了正文的几个之外,大多很少应用。
type Publishing struct {    // headers类型交换器会应用    Headers Table        // 属性    ContentType     string    // 音讯类型    ContentEncoding string    // 音讯编码    DeliveryMode    uint8     // 是否长久化,0或1非长久化,2长久化    Priority        uint8     // 优先级,0 - 9    CorrelationId   string    // 关联id,有助于将RPC响应与申请相关联    ReplyTo         string    // RPC响应的回调地址,罕用于命名回调队列    Expiration      string        MessageId       string        Timestamp       time.Time     Type            string        UserId          string        AppId           string        // 音讯体    Body []byte}

生产音讯

RabbitMQ的生产模式分两种:推(Push)模式和拉(Pull)模式。

推模式

在推模式中,能够通过继续订阅的形式来生产音讯。

func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
  • queue:队列名称。
  • consumer:消费者标签,用于辨别不同的消费者。
  • autoAck:设置是否主动确认,倡议设置成false。
  • exclusive:设置是否排他,排他示意以后队列只能给一个消费者应用。
  • noLocal:设置为true则示意不能将同一个Connection中生产者发送的音讯传送给这Connection中的消费者。
  • nowait:是否非阻塞期待服务器返回,倡议设置为false。
  • args:设置消费者的其余参数。

此函数返回一个单向Delivery类型通道,遍历该通道,有音讯则进行解决,没有则阻塞。如果须要勾销订阅的话,能够调用Cancel办法:

func (ch *Channel) Cancel(consumer string, noWait bool) error
  • consumer:消费者标签,用于辨别不同的消费者。
  • nowait:是否非阻塞期待服务器返回,倡议设置为false。

拉模式

func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error)
  • queue:队列名称。
  • autoAck:设置是否主动确认,倡议设置成false。

推模式下将信道置为接管模式,直到勾销队列的订阅为止。在接管模式期间,RabbitMQ会一直地推送音讯给消费者(当然推送音讯的个数还是会受到Qos的限度)。如果只想从队列取得单条音讯而不是继续订阅,倡议还是应用拉模式。然而不能将Get办法放在一个循环里来代替推模式,这样做会重大影响RabbitMQ的性能。如果要实现高吞吐,消费者理当应用推模式。

生产端的确认与回绝

为了保障音讯从队列牢靠地达到消费者,RabbitMQ提供了音讯确认机制。消费者在订阅队列时,能够指定autoAck参数,当autoAck等于false时,RabbitMQ会期待消费者显式地回复确认信号后才删除音讯。当autoAck等于true时,RabbitMQ会主动把发送进来的音讯置为确认,而后删除,而不论消费者是否真正地生产到了这些音讯。

当autoAck参数置为false,对于RabbitMQ服务端而言,队列中的音讯分成了两个局部:一部分是期待投递给消费者的音讯,另一部分是曾经投递给消费者,然而还没有收到消费者确认信号的音讯。如果RabbitMQ始终没有收到消费者的确认信号,并且生产此音讯的消费者曾经断开连接,则RabbitMQ会安顿该音讯从新进入队列,期待投递给下一个消费者(当然也有可能还是原来的那个消费者)。

RabbitMQ不会为未确认的音讯设置过期工夫,它判断此音讯是否须要从新投递给消费者的惟一根据是生产该音讯的消费者连贯是否曾经断开。

确认音讯

func (ch *Channel) Ack(tag uint64, multiple bool) errorfunc (d Delivery) Ack(multiple bool) error
  • tag:能够看作是音讯的编号。
  • multiple:设置是否批量确认。true示意确认编号为tag之前所有未被以后消费者确认的音讯,false示意仅确认编号为tag的音讯。

回绝音讯

func (ch *Channel) Reject(tag uint64, requeue bool) errorfunc (d Delivery) Reject(requeue bool) error 
  • tag:能够看作是音讯的编号。
  • requeue:设置是否将音讯重新加入队列。如果设置为true,则RabbitMQ会从新将这条音讯存入队列,以便能够发送给下一个订阅的消费者;如果设置为false,则立刻会将音讯从队列中移除。

批量回绝音讯

func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) errorfunc (d Delivery) Nack(multiple, requeue bool) error
  • tag:能够看作是音讯的编号。
  • multiple:设置是否批量回绝。
  • requeue:设置是否将音讯重新加入队列。

注:将Reject或者Nack办法中的requeue设置为false,能够启用“死信队列”的性能。死信队列能够通过检测被回绝或者未送达的音讯来追踪问题。

复原音讯

func (ch *Channel) Recover(requeue bool) error
  • requeue:设置是否将音讯重新加入队列。

此办法用来申请RabbitMQ从新发送还未被确认的音讯。如果requeue参数设置为true,则未被确认的音讯会被重新加入到队列中,这样对于同一条音讯来说,可能会被调配给与之前不同的消费者。如果requeue参数设置为false,那么同一条音讯会被调配给与之前雷同的消费者。

注:Delivery的确认和回绝相干办法实际上都调用了Channel的同名办法,个别状况下举荐应用Delivery的相干办法。

敞开连贯

func (ch *Channel) Close() errorfunc (c *Connection) Close() error

显式地敞开Channel是个好习惯,但不是必须的,在Connection敞开的时候,Channel也会主动敞开。