【BlogBook书】10、RabbitMQ:消息队列

科技   科技   2024-02-06 19:05   北京  

框架中重点使用RabbitMQ和Kafka作为消息队列的中间件工具,本章节说明RabbitMQ的具体使用方式。


框架已经完整的封装了EventBus事件总线,如果想要使用更加完善的消息队列的方式,建议使用事件总线,当然也可以直接如下文进行使用。


一、相关的依赖注入配置

// 服务注册builder.Services.AddRabbitMQSetup();

相关参数设置

"RabbitMQ": {   "Enabled": true,   "Connection": "xxxxx",   "UserName": "admin",   "Password": "admin",   "Port": "5672",   "RetryCount": 2 },


二、使用方式

1、在任意位置做消息的发送

private readonly IRabbitMQPersistentConnection _persistentConnection;  public ValuesController(IRabbitMQPersistentConnection persistentConnection) {    _persistentConnection = persistentConnection; }

/// <summary> /// 测试Rabbit消息队列发送 /// </summary> [HttpGet] [AllowAnonymous] public IActionResult TestRabbitMqPublish() { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } _persistentConnection.PublishMessage("Hello, RabbitMQ!", exchangeName: "blogcore", routingKey: "myRoutingKey"); return Ok(); }


2、在需要的地方进行消费

/// <summary> /// 测试Rabbit消息队列订阅 /// </summary> [HttpGet] [AllowAnonymous] public IActionResult TestRabbitMqSubscribe() {     if (!_persistentConnection.IsConnected)     {         _persistentConnection.TryConnect();     }
_persistentConnection.StartConsuming("myQueue"); return Ok(); }


3、具体的代码,请参考RabbitMQPersistentConnection.cs即可

 /// <summary> /// 发布消息 /// </summary> /// <param name="message"></param> /// <param name="exchangeName"></param> /// <param name="routingKey"></param> public void PublishMessage(string message, string exchangeName, string routingKey) {     using var channel = CreateModel();     channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, true);     var body = Encoding.UTF8.GetBytes(message);     channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, basicProperties: null, body: body); }
/// <summary> /// 订阅消息 /// </summary> /// <param name="queueName"></param> public void StartConsuming(string queueName) { using var channel = CreateModel(); channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
var consumer = new AsyncEventingBasicConsumer(channel); consumer.Received += new AsyncEventHandler<BasicDeliverEventArgs>( async (a, b) => { var Headers = b.BasicProperties.Headers; var msgBody = b.Body.ToArray(); var message = Encoding.UTF8.GetString(msgBody); await Task.CompletedTask; Console.WriteLine("Received message: {0}", message);
//bool Dealresult = await Dealer(b.Exchange, b.RoutingKey, msgBody, Headers); //if (Dealresult) channel.BasicAck(b.DeliveryTag, false); //else channel.BasicNack(b.DeliveryTag, false, true); } );
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine("Consuming messages..."); }

最终效果:


发布一条消息:

消费这个消息



BCVP代码创新社
专注于 NetCore 相关技术栈的推广,致力于前后端之间的完全分离,从壹开始,让每一个程序员都能从这里学有所成。
 最新文章