共计 3255 个字符,预计需要花费 9 分钟才能阅读完成。
生产者代码:
package main | |
import ( | |
uuid "github.com/satori/go.uuid" | |
"github.com/streadway/amqp" | |
"github.com/wonderivan/logger" | |
"rmq/db/rmq" | |
"time" | |
) | |
const ( | |
DeadLettersExchangeName = "dlx_exchange_packet" // 死信交换机 | |
DeadLettersQueueName = "dlx_queue_packet" // 死信队列 | |
QueueName = "queue_packet" // 目标队列 | |
ExchangeName = "exchange_packet" // 目标交换机 | |
) | |
var ( | |
ch *amqp.Channel | |
err error | |
conn *amqp.Connection | |
queue amqp.Queue | |
dlxQueue amqp.Queue | |
) | |
func main() {if conn, err = rmq.GetConn(); err != nil {logger.Error("连接 RabbitMQ 服务器失败:%s", err.Error()) | |
return | |
} | |
defer conn.Close() | |
if ch, err = conn.Channel(); err != nil {logger.Error("获取 Channel 失败:%s", err.Error()) | |
return | |
} | |
defer ch.Close() | |
// 声明队列交换机 | |
if err = ch.ExchangeDeclare(ExchangeName, amqp.ExchangeFanout, true, false, false, false, nil); err != nil {logger.Error("声明业务交换机失败:%s", err.Error()) | |
return | |
} | |
// 创建死信交换机 | |
if err = ch.ExchangeDeclare(DeadLettersExchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil {logger.Error("创建死信交换机:%s", err.Error()) | |
return | |
} | |
// 创建死信队列 | |
if dlxQueue, err = ch.QueueDeclare(DeadLettersQueueName, true, false, false, false, nil); err != nil {logger.Error("创建死信队列失败:%s", err.Error()) | |
return | |
} | |
// 创建业务队列 | |
if queue, err = ch.QueueDeclare(QueueName, true, false, false, false, amqp.Table{ | |
"x-message-ttl": 6000, // 消息过期时间 毫秒 | |
"x-dead-letter-exchange": DeadLettersExchangeName, // 死信交换机 | |
// "x-dead-letter-routing-key": "dlxKey", // 死信路由 key | |
}); err != nil {logger.Warn("创建业务队列失败:%s", err.Error()) | |
return | |
} | |
// 业务队列绑定交换机 | |
if err = ch.QueueBind(queue.Name, "", ExchangeName, false, nil); err != nil {logger.Error("绑定业务交换机失败:%s", err.Error()) | |
return | |
} | |
// 死信队列绑定死信交换机 | |
if err = ch.QueueBind(dlxQueue.Name, "", DeadLettersExchangeName, false, nil); err != nil {logger.Error("绑定死信交换机失败:%s", err.Error()) | |
} | |
for i := 1; i <= 10; i++ { | |
msg := amqp.Publishing{MessageId: uuid.NewV4().String(), | |
ContentType: "text/plain", | |
Body: []byte("红包退回"), | |
} | |
// 发布消息 | |
err = ch.Publish( | |
ExchangeName, | |
"", | |
false, | |
false, | |
msg, | |
) | |
if err != nil {logger.Error("发送失败: %s", err.Error()) | |
return | |
} else {logger.Info("发送成功:%s", msg.MessageId) | |
} | |
} | |
} |
消费者代码
package main | |
import ( | |
uuid "github.com/satori/go.uuid" | |
"github.com/streadway/amqp" | |
"github.com/wonderivan/logger" | |
"rmq/db/rmq" | |
"time" | |
) | |
const ( | |
DeadLettersExchangeName = "dlx_exchange_packet" // 死信交换机 | |
DeadLettersQueueName = "dlx_queue_packet" // 死信队列 | |
QueueName = "queue_packet" // 目标队列 | |
ExchangeName = "exchange_packet" // 目标交换机 | |
) | |
var ( | |
ch *amqp.Channel | |
err error | |
conn *amqp.Connection | |
queue amqp.Queue | |
dlxQueue amqp.Queue | |
) | |
func main() {if conn, err = rmq.GetConn(); err != nil {logger.Error("连接 RabbitMQ 服务器失败:%s", err.Error()) | |
return | |
} | |
defer conn.Close() | |
if ch, err = conn.Channel(); err != nil {logger.Error("获取 Channel 失败:%s", err.Error()) | |
return | |
} | |
defer ch.Close() | |
// 创建死信交换机 | |
if err = ch.ExchangeDeclare(DeadLettersExchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil {logger.Error("创建死信交换机:%s", err.Error()) | |
return | |
} | |
// 创建死信队列 | |
if dlxQueue, err = ch.QueueDeclare(DeadLettersQueueName, true, false, false, false, nil); err != nil {logger.Error("创建死信队列失败:%s", err.Error()) | |
return | |
} | |
// 死信队列绑定死信交换机 | |
if err = ch.QueueBind(dlxQueue.Name, "", DeadLettersExchangeName, false, nil); err != nil {logger.Error("绑定死信交换机失败:%s", err.Error()) | |
} | |
msgList, err := ch.Consume(dlxQueue.Name, "", false, false, false, false, nil) | |
if err != nil {logger.Error("消费者监听失败:%s", err.Error()) | |
return | |
} | |
for { | |
select { | |
case message, ok := <-msgList: | |
if !ok {continue} | |
go func(msg amqp.Delivery) {logger.Info("messageID: %s", msg.MessageId) | |
logger.Info("messageBody: %s", msg.Body) | |
if err = msg.Ack(false); err != nil {logger.Error("确认消息失败") | |
} | |
}(message) | |
case <-time.After(time.Second): | |
} | |
} | |
} |
正文完