框架中重点使用RabbitMQ和Kafka作为消息队列的中间件工具,本章节说明RabbitMQ的具体使用方式。
框架已经完整的封装了EventBus事件总线,如果想要使用更加完善的消息队列的方式,建议使用事件总线,当然也可以直接如下文进行使用。
一、相关的依赖注入配置
// 服务注册
builder.Services.AddRabbitMQSetup();
相关参数设置
"RabbitMQ": {
"Enabled": true,
"Connection": "xxxxx",
"UserName": "admin",
"Password": "admin",
"Port": "5672",
"RetryCount": 2
},
二、使用方式
1、在任意位置做消息的发送
private readonly IRabbitMQPersistentConnection _persistentConnection;
public ValuesController(IRabbitMQPersistentConnection persistentConnection)
{
_persistentConnection = persistentConnection;
}
/// <summary>
/// 测试Rabbit消息队列发送
/// </summary>
[ ]
[ ]
public IActionResult TestRabbitMqPublish()
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
_persistentConnection.PublishMessage("Hello, RabbitMQ!", exchangeName: "blogcore", routingKey: "myRoutingKey");
return Ok();
}
2、在需要的地方进行消费
/// <summary>
/// 测试Rabbit消息队列订阅
/// </summary>
[ ]
[ ]
public IActionResult TestRabbitMqSubscribe()
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
_persistentConnection.StartConsuming("myQueue");
return Ok();
}
3、具体的代码,请参考RabbitMQPersistentConnection.cs即可
/// <summary>
/// 发布消息
/// </summary>
/// <param name="message"></param>
/// <param name="exchangeName"></param>
/// <param name="routingKey"></param>
public void PublishMessage(string message, string exchangeName, string routingKey)
{
using var channel = CreateModel();
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, true);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, basicProperties: null, body: body);
}
/// <summary>
/// 订阅消息
/// </summary>
/// <param name="queueName"></param>
public void StartConsuming(string queueName)
{
using var channel = CreateModel();
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += new AsyncEventHandler<BasicDeliverEventArgs>(
async (a, b) =>
{
var Headers = b.BasicProperties.Headers;
var msgBody = b.Body.ToArray();
var message = Encoding.UTF8.GetString(msgBody);
await Task.CompletedTask;
Console.WriteLine("Received message: {0}", message);
//bool Dealresult = await Dealer(b.Exchange, b.RoutingKey, msgBody, Headers);
//if (Dealresult) channel.BasicAck(b.DeliveryTag, false);
//else channel.BasicNack(b.DeliveryTag, false, true);
}
);
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine("Consuming messages...");
}
最终效果:
发布一条消息:
消费这个消息