本文首发于码友网–《.NET 5/.NET Core应用程序中应用音讯队列中间件RabbitMQ示例教程》
前言
在现在的互联网时代,音讯队列中间件未然成为了分布式系统中重要的组件,它能够解决利用耦合,异步音讯,流量削峰等利用场景。借助音讯队列,咱们能够实现架构的高可用、高性能、可伸缩等个性,是大型分布式系统架构中不可或缺的中间件。
目前比拟风行的音讯队列中间件次要有:RabbitMQ, NATS, Kafka, ZeroMQ, Amazon SQS, ServiceStack, Apache Pulsar, RocketMQ, ActiveMQ, IBM MQ等等。
本文次要为大家分享的是在.NET 5应用程序中应用消息中间件RabbitMQ的示例教程。
筹备工作
在开始本文实战之前,请筹备以下环境:
- 消息中间件:RabbitMQ
- 开发工具:Visual Studio 2019或VS Code或Rider
笔者应用的开发工具是Rider 2021.1.2。
筹备解决方案和我的项目
创立我的项目
关上Rider,创立一个名为RabbitDemo的解决方案,再顺次创立三个基于.NET 5的我的项目,别离为:RabbitDemo.Shared
, RabbitDemo.Send
以及RabbitDemo.Receive
。
- RabbitDemo.Shared 我的项目次要用于寄存共用的RabbitMQ的连贯相干的类;
- RabbitDemo.Send 我的项目次要用于模仿生产者(发布者);
- RabbitDemo.Receive 我的项目次要用于模仿消费者(订阅者)
装置依赖包
首先,在以上创立的三个我的项目中别离应用包管理工具或者命令行工具装置RabbitMQ.Client
依赖包,如下:
编写RabbitDemo.Shared我的项目
RabbitDemo.Shared我的项目次要用于寄存共用的RabbitMQ的连贯相干的类。这里咱们创立一个RabbitChannel
类,而后在其中增加创立一些连贯RabbitMQ相干的办法,包含初始化RabbitMQ的连贯,敞开RabbitMQ连贯等,代码如下:
using RabbitMQ.Client;
namespace RabbitDemo.Shared
{
public class RabbitChannel
{
public static IModel Channel;
private static IConnection _connection;
public static IConnection Connection => _connection;
public static void Init()
{
_connection = new ConnectionFactory
{
HostName = "xxxxxx", // 你的RabbitMQ主机地址
UserName = "xxx", // RabbitMQ用户名
VirtualHost = "xxx", // RabbitMQ虚拟主机
Password = "xxxxxx" // RabbitMQ明码
}.CreateConnection();
Channel = _connection.CreateModel();
}
public static void CloseConnection()
{
if (Channel != null)
{
Channel.Close();
Channel.Dispose();
}
if (_connection != null)
{
_connection.Close();
_connection.Dispose();
}
}
}
}
编写音讯生产者
在我的项目RabbitDemo.Send中,援用我的项目RabbitDemo.Shared
,而后创立一个名为Send.cs
类,并在其中编写生产者的代码,如下:
using System;
using System.Text;
using System.Threading;
using RabbitDemo.Shared;
using RabbitMQ.Client;
namespace RabbitDemo.Send
{
public class Send
{
public static void Run()
{
for (var i = 0; i < 5; i++)
{
Publish(i);
Thread.Sleep(500);
}
}
private static void Publish(int index)
{
RabbitChannel.Channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var message = $"Hello World from sender({index})!";
var body = Encoding.UTF8.GetBytes(message);
RabbitChannel.Channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
}
}
以上示例模仿的是一个生产者一次生产了5条音讯,并将音讯存储到了RabbitMQ的音讯队列中。
批改此我的项目中的Program.cs
代码如下:
using System;
using RabbitDemo.Shared;
namespace RabbitDemo.Send
{
static class Program
{
static void Main(string[] args)
{
Console.WriteLine("按任意键退出.");
RabbitChannel.Init();
Send.Run();
Console.ReadKey();
Console.WriteLine("正在敞开连贯...");
RabbitChannel.CloseConnection();
Console.WriteLine("连贯已敞开,退出程序.");
}
}
}
编写音讯消费者
在我的项目RabbitDemo.Receive中,援用我的项目RabbitDemo.Shared
,而后创立一个名为Receive.cs
的类,并在其中编写消费者的代码,如下:
using System;
using System.Linq;
using System.Text;
using RabbitDemo.Shared;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace RabbitDemo.Receive
{
public class Receive
{
public static void Run()
{
RabbitChannel.Channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(RabbitChannel.Channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
RabbitChannel.Channel.BasicAck(deliveryTag:ea.DeliveryTag,multiple:false);
};
RabbitChannel.Channel.BasicConsume(queue: "hello",
autoAck: false,
consumer: consumer);
}
}
}
批改此我的项目中的Program.cs
代码如下:
using System;
using RabbitDemo.Shared;
namespace RabbitDemo.Receive
{
static class Program
{
static void Main(string[] args)
{
Console.WriteLine("按任意键退出.");
RabbitChannel.Init();
Receive.Run();
Console.ReadKey();
Console.WriteLine("正在敞开连贯...");
RabbitChannel.CloseConnection();
Console.WriteLine("连贯已敞开,退出程序.");
}
}
}
运行
别离生成和运行生产者和消费者我的项目,运行成果如下:
从上图能够看出,整个演示过程,RabbitMQ的音讯音讯是十分即时的,消费者简直能够实时地生产生产者生产的音讯。
须要确认的音讯队列
在下面的生产者/消费者示例中,音讯一经消费者生产,RabbitMQ会立刻将音讯从队列中移除。但在某些场景中,咱们须要消费者确认音讯被正确生产后再将其从队列中移除,RabbitMQ提供了生产确认的性能,上面咱们来应用示例演示。
首先在RabbitDemo.Send中创立名为Worker.cs
的类,并编写如下代码:
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Text;
using System.Threading;
using RabbitDemo.Shared;
using RabbitMQ.Client;
namespace RabbitDemo.Send
{
public class Worker
{
public static void Run()
{
for (var i = 0; i < 5; i++)
{
Thread.Sleep(1000);
Publish(i);
}
}
private static void Publish(int index)
{
var args = new Dictionary<string, object>
{
{"x-max-priority", 0}
};
RabbitChannel.Channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: args);
var message = $"Hello({index}) at {DateTime.Now.ToString(CultureInfo.InvariantCulture)}";
var body = Encoding.UTF8.GetBytes(message);
var properties = RabbitChannel.Channel.CreateBasicProperties();
properties.Persistent = true;
properties.Headers = new Dictionary<string, object>
{
{"order-no", $"1001{index}"}
};
RabbitChannel.Channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
}
}
此示例模仿生和了5条音讯,并将音讯寄存到了RabbitMQ的task_queue
队列中,其中咱们还通过QueueDeclare()
办法的arguments
参数设置了队列的优先级,也通过basicProperties
参数增加了自定义的音讯头(Header)参数order-no
。
接着,在我的项目RabbitDemo.Receive我的项目中创立一个名为Task.cs
的类,并编写如下的消费者代码:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using RabbitDemo.Shared;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace RabbitDemo.Receive
{
public class Task
{
public static void Run()
{
Console.WriteLine(" [*] Waiting for messages.");
Consume();
}
private static void Consume()
{
var args = new Dictionary<string, object>
{
{"x-max-priority", 0}
};
RabbitChannel.Channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: args);
RabbitChannel.Channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(RabbitChannel.Channel);
consumer.Received += (sender, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
var messageBuilder = new StringBuilder();
foreach (var headerKey in ea.BasicProperties.Headers.Keys)
{
var value = ea.BasicProperties.Headers[headerKey] as byte[];
messageBuilder.Append("Header key: ")
.Append(headerKey)
.Append(", value: ")
.Append(Encoding.UTF8.GetString(value))
.Append("; ");
}
Console.WriteLine($"Customer properties:{messageBuilder.ToString()}");
var sleep = 6;
Thread.Sleep(sleep * 1000);
Console.WriteLine(" [x] Done");
((EventingBasicConsumer)sender)?.Model.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
RabbitChannel.Channel.BasicConsume(queue: "task_queue",
autoAck: false,
consumer: consumer);
}
}
}
在这段消费者代码中,咱们次要关注的是如何向RabbitMQ确认音讯的正确生产,与下面的Receive.cs
消费者相比,此示例中设置了
RabbitChannel.Channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
这里的设置示意:一个消费者最多只能一次拉取1条音讯
和
((EventingBasicConsumer)sender)?.Model.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
这条语句示意音讯已确认被失常生产。
批改生产者的Program.cs
:
using System;
using RabbitDemo.Shared;
namespace RabbitDemo.Send
{
static class Program
{
static void Main(string[] args)
{
Console.WriteLine("按任意键退出.");
RabbitChannel.Init();
Worker.Run();
Console.ReadKey();
Console.WriteLine("正在敞开连贯...");
RabbitChannel.CloseConnection();
Console.WriteLine("连贯已敞开,退出程序.");
}
}
}
批改生产的Program.cs
:
using System;
using RabbitDemo.Shared;
namespace RabbitDemo.Receive
{
static class Program
{
static void Main(string[] args)
{
Console.WriteLine("按任意键退出.");
RabbitChannel.Init();
Task.Run();
Console.ReadKey();
Console.WriteLine("正在敞开连贯...");
RabbitChannel.CloseConnection();
Console.WriteLine("连贯已敞开,退出程序.");
}
}
}
成果如下:
发表回复