昏喽喽

vuePress-theme-reco Lio    2020 - 2025
昏喽喽

Choose mode

  • dark
  • auto
  • light
Home
Category
  • CentOS
  • Csharp
  • DataBase
  • DesignMode
  • Vue
  • FrontEnd
  • GLD
  • Kingdee
  • NetWork
Tags
TimeLine
Tools
  • Http请求
  • 日志配置
  • 加密解密
  • 验证码
  • Git命令
About
author-avatar

Lio

103

Articles

15

Tags

Home
Category
  • CentOS
  • Csharp
  • DataBase
  • DesignMode
  • Vue
  • FrontEnd
  • GLD
  • Kingdee
  • NetWork
Tags
TimeLine
Tools
  • Http请求
  • 日志配置
  • 加密解密
  • 验证码
  • Git命令
About

RabbitMQ使用

vuePress-theme-reco Lio    2020 - 2025

RabbitMQ使用

Lio 2022-03-19 MicroService

# 什么是RabbitMQ

RabbitMQ是消息队列。简称:MQ。MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

# 微服务系统中使用RabbitMQ

在微服务系统中,微服务之间通信,主要是通过Http或者gRPC通信。由于http/gRPC通信方式是同步通信,如果遇到了高并发,同步通信就会导致微服务系统性能瓶颈,所以,为了解决微服务性能瓶颈问题。需要将同步通信换成异步通信方式。因此。就选用使用消息队列。

# RabbitMQ落地

# 准备环境

  • 下载:RabbitMQ下载地址 (opens new window)、RabbitMQ 运行环境erlang下载地址 (opens new window)

  • 运行RabbitMQ

    1. 安装RabbitMQ管理插件
    rabbitmq-plugins enable rabbitmq_management
    
    1
    1. 启动RabbitMQ
    rabbitmq-server 
    
    1
    1. 查看RabbitMQ运行状态
    rabbitmqctl status
    
    1
  • 访问RabbitMQ管理界面。(http://localhost:15672)

# 项目实现

  • 添加Nuget包:RabbitMQ.Client

  • 创建RabbitMQ连接

    public static IConnection CreateRabbitMQConnection(IConfiguration configuration) 
    {
        RabbitMQConfig rabbitMQConfig = configuration.GetSection("RabbitMQConfig").Get<RabbitMQConfig>();
        // 1、创建连接工厂
        var factory = new ConnectionFactory()
        {
            HostName = rabbitMQConfig.HostName,
            Port = rabbitMQConfig.Port,
            Password = rabbitMQConfig.Password,
            UserName = rabbitMQConfig.UserName,
            VirtualHost = rabbitMQConfig.VirtualHost
        };
        return factory.CreateConnection();
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
  • 生产者发送消息

    var connection = RabbitMQConnectionFactory.CreateRabbitMQConnection(_configuration);
    var channel = connection.CreateModel();
    
    string productJson = JsonConvert.SerializeObject(productCreateDto);
    var body = Encoding.UTF8.GetBytes(productJson);
    
    #region 生产者
    //2、定义队列
    channel.QueueDeclare(queue: "Product_create",
                         durable: true, //消息持久化(防止rabbitmq宕机导致队列丢失风险)
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);
    
    
    //3、发送消息
    var properties = channel.CreateBasicProperties();
    properties.Persistent = true;//设置消息持久化
    channel.BasicPublish(exchange: "",
                         routingKey: "Product_create",
                         basicProperties: properties,
                         body: body);
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
  • 消费者接受消息

    var connection = RabbitMQConnectionFactory.CreateRabbitMQConnection(_configuration);
    var channel = connection.CreateModel();
    
    //工作队列(单消费者)
    channel.QueueDeclare(queue: "Product_create", durable: true, exclusive: false, autoDelete: false, arguments: null);
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        Console.WriteLine($"model:{model}");
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine(" [x] 创建商品 {0}", message);
        // 业务逻辑执行完成后,手动消息确认
        // 自动确认机制缺陷:
        // 1、消息是否正常添加到数据库当中,所以需要使用手工确认
        channel.BasicAck(ea.DeliveryTag, true);
    
    };
    //有多个消费者时,如果不设置,则采用轮询的方式来消费
    channel.BasicQos(0, 1, false);//Qos(防止多个消费者,能力不一致,导致的系统质量问题,每一次一个消费者只能成功消费一个)
    channel.BasicConsume(queue: "Product_create",
                         autoAck: false,//关闭自动消息确认
                         consumer: consumer); 
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
  • 扇形交换机(fanout):扇形交换机,就是订阅发布,生产者把消息发给给RabbitMQ---->RabbitMQ再把消息发送给交换机----->然后再发送给所有队列----->发送给消费者

  • 直连交换机(direct):直连交换机,就是指定订阅发布,生产者把消息发送给RabbitMQ---->RabbitMQ再把消息发送给交换机----->然后再发送给指定队列(通过routingKey匹配)----->发送给消费者

  • 主题交换机(topic):主题交换机,就是不同生产者匹配到相同消费者,生产者把消息发送给RabbitMQ---->RabbitMQ再把消息发送给交换机----->然后再发送给指定队列----->发送给消费者。* 号的缺陷:只能匹配一级,# 能够匹配一级及多级以上。

  • RPC回调:客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败); 服务器端收到消息并处理;服务器端处理完消息后0,0将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性;客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理。

    //生产者
    var connection = RabbitMQConnectionFactory.CreateRabbitMQConnection(_configuration);
    var channel = connection.CreateModel();
    
    string productJson = JsonConvert.SerializeObject(productCreateDto);
    var body = Encoding.UTF8.GetBytes(productJson);
    //定义队列
    var queueName = channel.QueueDeclare().QueueName;
    var properties=channel.CreateBasicProperties();
    var correlationId = Guid.NewGuid().ToString();
    properties.CorrelationId = correlationId;
    properties.ReplyTo = queueName;
    properties.Persistent = true;//消息持久化
    //发送消息
    channel.BasicPublish("","Product_Create2",properties,body);
    
    //消息回调
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        Console.WriteLine($"model:{model}");
        var body = ea.Body;
        // 1、业务逻辑处理
        var message = Encoding.UTF8.GetString(body.ToArray());
        if (ea.BasicProperties.CorrelationId == correlationId)
        {
            Console.WriteLine(" [x] 回调成功 {0}", message);
        }
    
    };
    
    channel.BasicQos(0, 1, false);
    channel.BasicConsume(queueName, true, consumer);
    
    ======================================================================
    
    //消费者
    var connection = RabbitMQConnectionFactory.CreateRabbitMQConnection(_configuration);
    var channel = connection.CreateModel();
    var queue = channel.QueueDeclare("Product_Create2", false, false, false, null);
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        Console.WriteLine($"model:{model}");
        var body = ea.Body;
    
        var props = ea.BasicProperties;
        var replyProps = channel.CreateBasicProperties();
        replyProps.CorrelationId = props.CorrelationId;
    
        try
        {
            // 1、执行业务
            var message = Encoding.UTF8.GetString(body.ToArray());
            Console.WriteLine(" [x] 创建商品 {0}", message);
    
        }
        catch (Exception ex)
        {
            Console.WriteLine(" [.] " + ex.Message);
        }
        finally
        {
            Console.WriteLine("发送回调消息");
            var responseBytes = Encoding.UTF8.GetBytes("商品回调成功");
            channel.BasicPublish("", props.ReplyTo, basicProperties: replyProps, body: responseBytes);
        }
    };
    channel.BasicQos(0, 1, false);// Qos(防止多个消费者,能力不一致,导致的系统质量问题。每一次一个消费者只成功消费一个)
    
    channel.BasicConsume("Product_Create2", true, consumer); 
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71