本文首发于码友网 –《.NET 5/.NET Core 应用程序中应用音讯队列中间件 RabbitMQ 示例教程》
前言
在现在的互联网时代,音讯队列中间件未然成为了分布式系统中重要的组件,它能够解决利用耦合,异步音讯,流量削峰等利用场景。借助音讯队列,咱们能够实现架构的高可用、高性能、可伸缩等个性,是大型分布式系统架构中不可或缺的中间件。
目前比拟风行的音讯队列中间件次要有:RabbitMQ, NATS, Kafka, ZeroMQ, Amazon SQS, ServiceStack, Apache Pulsar, RocketMQ, ActiveMQ, IBM MQ 等等。
本文次要为大家分享的是在.NET 5 应用程序中应用消息中间件 RabbitMQ 的示例教程。
筹备工作
在开始本文实战之前,请筹备以下环境:
- 消息中间件:RabbitMQ
- 开发工具:Visual Studio 2019 或 VS Code 或 Rider
笔者应用的开发工具是Rider 2021.1.2。
筹备解决方案和我的项目
创立我的项目
关上 Rider,创立一个名为 RabbitDemo 的解决方案,再顺次创立三个基于.NET 5 的我的项目,别离为:RabbitDemo.Shared
, RabbitDemo.Send
以及RabbitDemo.Receive
。
- RabbitDemo.Shared 我的项目次要用于寄存共用的 RabbitMQ 的连贯相干的类;
- RabbitDemo.Send 我的项目次要用于模仿生产者(发布者);
- RabbitDemo.Receive 我的项目次要用于模仿消费者(订阅者)
装置依赖包
首先,在以上创立的三个我的项目中别离应用包管理工具或者命令行工具装置 RabbitMQ.Client
依赖包,如下:
编写 RabbitDemo.Shared 我的项目
RabbitDemo.Shared我的项目次要用于寄存共用的 RabbitMQ 的连贯相干的类。这里咱们创立一个 RabbitChannel
类,而后在其中增加创立一些连贯 RabbitMQ 相干的办法,包含初始化 RabbitMQ 的连贯,敞开 RabbitMQ 连贯等,代码如下:
using RabbitMQ.Client;
namespace RabbitDemo.Shared
{
public class RabbitChannel
{
public static IModel Channel;
private static IConnection _connection;
public static IConnection Connection => _connection;
public static void Init()
{
_connection = new ConnectionFactory
{
HostName = "xxxxxx", // 你的 RabbitMQ 主机地址
UserName = "xxx", // RabbitMQ 用户名
VirtualHost = "xxx", // RabbitMQ 虚拟主机
Password = "xxxxxx" // RabbitMQ 明码
}.CreateConnection();
Channel = _connection.CreateModel();}
public static void CloseConnection()
{if (Channel != null)
{Channel.Close();
Channel.Dispose();}
if (_connection != null)
{_connection.Close();
_connection.Dispose();}
}
}
}
编写音讯生产者
在我的项目 RabbitDemo.Send 中,援用我的项目 RabbitDemo.Shared
,而后创立一个名为Send.cs
类,并在其中编写生产者的代码,如下:
using System;
using System.Text;
using System.Threading;
using RabbitDemo.Shared;
using RabbitMQ.Client;
namespace RabbitDemo.Send
{
public class Send
{public static void Run()
{for (var i = 0; i < 5; i++)
{Publish(i);
Thread.Sleep(500);
}
}
private static void Publish(int index)
{
RabbitChannel.Channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var message = $"Hello World from sender({index})!";
var body = Encoding.UTF8.GetBytes(message);
RabbitChannel.Channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
Console.WriteLine("[x] Sent {0}", message);
}
}
}
以上示例模仿的是一个生产者一次生产了 5 条音讯,并将音讯存储到了 RabbitMQ 的音讯队列中。
批改此我的项目中的 Program.cs
代码如下:
using System;
using RabbitDemo.Shared;
namespace RabbitDemo.Send
{
static class Program
{static void Main(string[] args)
{Console.WriteLine("按任意键退出.");
RabbitChannel.Init();
Send.Run();
Console.ReadKey();
Console.WriteLine("正在敞开连贯...");
RabbitChannel.CloseConnection();
Console.WriteLine("连贯已敞开,退出程序.");
}
}
}
编写音讯消费者
在我的项目 RabbitDemo.Receive 中,援用我的项目 RabbitDemo.Shared
,而后创立一个名为Receive.cs
的类,并在其中编写消费者的代码,如下:
using System;
using System.Linq;
using System.Text;
using RabbitDemo.Shared;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace RabbitDemo.Receive
{
public class Receive
{public static void Run()
{
RabbitChannel.Channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(RabbitChannel.Channel);
consumer.Received += (model, ea) =>
{var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("[x] Received {0}", message);
RabbitChannel.Channel.BasicAck(deliveryTag:ea.DeliveryTag,multiple:false);
};
RabbitChannel.Channel.BasicConsume(queue: "hello",
autoAck: false,
consumer: consumer);
}
}
}
批改此我的项目中的 Program.cs
代码如下:
using System;
using RabbitDemo.Shared;
namespace RabbitDemo.Receive
{
static class Program
{static void Main(string[] args)
{Console.WriteLine("按任意键退出.");
RabbitChannel.Init();
Receive.Run();
Console.ReadKey();
Console.WriteLine("正在敞开连贯...");
RabbitChannel.CloseConnection();
Console.WriteLine("连贯已敞开,退出程序.");
}
}
}
运行
别离生成和运行生产者和消费者我的项目,运行成果如下:
从上图能够看出,整个演示过程,RabbitMQ 的音讯音讯是十分即时的,消费者简直能够实时地生产生产者生产的音讯。
须要确认的音讯队列
在下面的生产者 / 消费者示例中,音讯一经消费者生产,RabbitMQ 会立刻将音讯从队列中移除。但在某些场景中,咱们须要消费者确认音讯被正确生产后再将其从队列中移除,RabbitMQ 提供了生产确认的性能,上面咱们来应用示例演示。
首先在 RabbitDemo.Send 中创立名为 Worker.cs
的类,并编写如下代码:
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Text;
using System.Threading;
using RabbitDemo.Shared;
using RabbitMQ.Client;
namespace RabbitDemo.Send
{
public class Worker
{public static void Run()
{for (var i = 0; i < 5; i++)
{Thread.Sleep(1000);
Publish(i);
}
}
private static void Publish(int index)
{
var args = new Dictionary<string, object>
{{"x-max-priority", 0}
};
RabbitChannel.Channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: args);
var message = $"Hello({index}) at {DateTime.Now.ToString(CultureInfo.InvariantCulture)}";
var body = Encoding.UTF8.GetBytes(message);
var properties = RabbitChannel.Channel.CreateBasicProperties();
properties.Persistent = true;
properties.Headers = new Dictionary<string, object>
{{"order-no", $"1001{index}"}
};
RabbitChannel.Channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
Console.WriteLine("[x] Sent {0}", message);
}
}
}
此示例模仿生和了 5 条音讯,并将音讯寄存到了 RabbitMQ 的 task_queue
队列中,其中咱们还通过 QueueDeclare()
办法的 arguments
参数设置了队列的优先级,也通过 basicProperties
参数增加了自定义的音讯头 (Header) 参数order-no
。
接着,在我的项目 RabbitDemo.Receive 我的项目中创立一个名为 Task.cs
的类,并编写如下的消费者代码:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using RabbitDemo.Shared;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace RabbitDemo.Receive
{
public class Task
{public static void Run()
{Console.WriteLine("[*] Waiting for messages.");
Consume();}
private static void Consume()
{
var args = new Dictionary<string, object>
{{"x-max-priority", 0}
};
RabbitChannel.Channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: args);
RabbitChannel.Channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(RabbitChannel.Channel);
consumer.Received += (sender, ea) =>
{var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("[x] Received {0}", message);
var messageBuilder = new StringBuilder();
foreach (var headerKey in ea.BasicProperties.Headers.Keys)
{var value = ea.BasicProperties.Headers[headerKey] as byte[];
messageBuilder.Append("Header key:")
.Append(headerKey)
.Append(", value:")
.Append(Encoding.UTF8.GetString(value))
.Append(";");
}
Console.WriteLine($"Customer properties:{messageBuilder.ToString()}");
var sleep = 6;
Thread.Sleep(sleep * 1000);
Console.WriteLine("[x] Done");
((EventingBasicConsumer)sender)?.Model.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
RabbitChannel.Channel.BasicConsume(queue: "task_queue",
autoAck: false,
consumer: consumer);
}
}
}
在这段消费者代码中,咱们次要关注的是如何向 RabbitMQ 确认音讯的正确生产,与下面的 Receive.cs
消费者相比,此示例中设置了
RabbitChannel.Channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
这里的设置示意:一个消费者最多只能一次拉取 1 条音讯
和
((EventingBasicConsumer)sender)?.Model.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
这条语句示意音讯已确认被失常生产。
批改生产者的Program.cs
:
using System;
using RabbitDemo.Shared;
namespace RabbitDemo.Send
{
static class Program
{static void Main(string[] args)
{Console.WriteLine("按任意键退出.");
RabbitChannel.Init();
Worker.Run();
Console.ReadKey();
Console.WriteLine("正在敞开连贯...");
RabbitChannel.CloseConnection();
Console.WriteLine("连贯已敞开,退出程序.");
}
}
}
批改生产的Program.cs
:
using System;
using RabbitDemo.Shared;
namespace RabbitDemo.Receive
{
static class Program
{static void Main(string[] args)
{Console.WriteLine("按任意键退出.");
RabbitChannel.Init();
Task.Run();
Console.ReadKey();
Console.WriteLine("正在敞开连贯...");
RabbitChannel.CloseConnection();
Console.WriteLine("连贯已敞开,退出程序.");
}
}
}
成果如下: