.NET Core中使用RabbitMQ正确方式首先甩官网:http://www.rabbitmq.com/然后是.NET Client链接:http://www.rabbitmq.com/dotnet.htmlGitHub仓库:https://github.com/rabbitmq/rabbitmq-dotnet-client下面直接进入正文,一共是两个主题:消费者怎么写?生产者怎么写?消费者在dotnet core mvc中,消费者肯定不能通过API或者其他的东西启动,理应是跟着程序一起启动的.所以…在dotnet core 2.0以上版本,我们直接用 IHostedService 接口实现..NET Core 中基于 IHostedService 实现后台定时任务Implementing background tasks in .NET Core 2.x webapps or microservices with IHostedService and the BackgroundService class直接上代码.// RabbitListener.cs 这个是基类,只实现注册RabbitMQ后到监听消息,然后每个消费者自己去重写RouteKey/QueueName/消息处理函数Processusing System;using System.Text;using System.Threading;using System.Threading.Tasks;using Microsoft.Extensions.DependencyInjection;using Microsoft.Extensions.Hosting;using Microsoft.Extensions.Logging;using Microsoft.Extensions.Options;using RabbitMQ.Client;using RabbitMQ.Client.Events;namespace Test.Listener{ public class RabbitListener : IHostedService { private readonly IConnection connection; private readonly IModel channel; public RabbitListener(IOptions<AppConfiguration> options) { try { var factory = new ConnectionFactory() { // 这是我这边的配置,自己改成自己用就好 HostName = options.Value.RabbitHost, UserName = options.Value.RabbitUserName, Password = options.Value.RabbitPassword, Port = options.Value.RabbitPort, }; this.connection = factory.CreateConnection(); this.channel = connection.CreateModel(); } catch (Exception ex) { Console.WriteLine($“RabbitListener init error,ex:{ex.Message}”); } } public Task StartAsync(CancellationToken cancellationToken) { Register(); return Task.CompletedTask; } protected string RouteKey; protected string QueueName; // 处理消息的方法 public virtual bool Process(string message) { throw new NotImplementedException(); } // 注册消费者监听在这里 public void Register() { Console.WriteLine($“RabbitListener register,routeKey:{RouteKey}”); channel.ExchangeDeclare(exchange: “message”, type: “topic”); channel.QueueDeclare(queue:QueueName, exclusive: false); channel.QueueBind(queue: QueueName, exchange: “message”, routingKey: RouteKey); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var result = Process(message); if (result) { channel.BasicAck(ea.DeliveryTag, false); } }; channel.BasicConsume(queue: QueueName, consumer: consumer); } public void DeRegister() { this.connection.Close(); } public Task StopAsync(CancellationToken cancellationToken) { this.connection.Close(); return Task.CompletedTask; } }}// 随便贴一个子类using System;using System.Text;using Microsoft.Extensions.Options;using Newtonsoft.Json.Linq;using RabbitMQ.Client;using RabbitMQ.Client.Events;using Microsoft.Extensions.DependencyInjection;using Microsoft.EntityFrameworkCore;using Microsoft.Extensions.Logging;namespace Test.Listener{ public class ChapterLister : RabbitListener { private readonly ILogger<RabbitListener> _logger; // 因为Process函数是委托回调,直接将其他Service注入的话两者不在一个scope, // 这里要调用其他的Service实例只能用IServiceProvider CreateScope后获取实例对象 private readonly IServiceProvider _services; public ChapterLister(IServiceProvider services, IOptions<AppConfiguration> options, ILogger<RabbitListener> logger) : base(options) { base.RouteKey = “done.task”; base.QueueName = “lemonnovelapi.chapter”; _logger = logger; _services = services; } public override bool Process(string message) { var taskMessage = JToken.Parse(message); if (taskMessage == null) { // 返回false 的时候回直接驳回此消息,表示处理不了 return false; } try { using (var scope = _services.CreateScope()) { var xxxService = scope.ServiceProvider.GetRequiredService<XXXXService>(); return true; } } catch (Exception ex) { _logger.LogInformation($“Process fail,error:{ex.Message},stackTrace:{ex.StackTrace},message:{message}”); _logger.LogError(-1, ex, “Process fail”); return false; } } }}然后,记住….注入到Startup.cs的时候,使用AddHostedService services.AddHostedService<ChapterLister>();消费者就这样玩了.生产者咋玩呢?这个其实更简单.using System;using System.Net;using Newtonsoft.Json.Linq;using RestSharp;using Microsoft.Extensions.Logging;using Microsoft.Extensions.Options;using RabbitMQ.Client;using Newtonsoft.Json;using System.Text;namespace Test.SDK{ public class RabbitMQClient { private readonly IModel _channel; private readonly ILogger _logger; public RabbitMQClient(IOptions<AppConfiguration> options, ILogger<RabbitMQClient> logger) { try { var factory = new ConnectionFactory() { HostName = options.Value.RabbitHost, UserName = options.Value.RabbitUserName, Password = options.Value.RabbitPassword, Port = options.Value.RabbitPort, }; var connection = factory.CreateConnection(); _channel = connection.CreateModel(); } catch (Exception ex) { logger.LogError(-1, ex, “RabbitMQClient init fail”); } _logger = logger; } public virtual void PushMessage(string routingKey, object message) { _logger.LogInformation($“PushMessage,routingKey:{routingKey}”); _channel.QueueDeclare(queue: “message”, durable: false, exclusive: false, autoDelete: false, arguments: null); string msgJson = JsonConvert.SerializeObject(message); var body = Encoding.UTF8.GetBytes(msgJson); _channel.BasicPublish(exchange: “message”, routingKey: routingKey, basicProperties: null, body: body); } }}切记注入实例的时候用单例模式.services.AddSingleton<RabbitMQClient, RabbitMQClient>();全文完…