DDD的技术落地 — RabbitMQ的基本使用

和领域事件不同,集成事件用于在微服务间进行事件的传递,因为这是服务器间的通信,所以必须借助第三方服务器作为事件总线。

我们一般使用消息中间件来作为事件总线,目前常用的消息中间件有Redis、RabbitMQ、Kafka、ActiveMQ等,这里我们使用RabbitMQ。

我们先来了解一下RabbitMQ中的几个基本概念:

(1)渠道(channel):

渠道是消息的生产者、消费者和服务器之间进行通信的虚拟连接。

为什么叫”虚拟连接”呢?因为TCP连接的建立是非常消耗资源的,所以RabbitMQ在TCP连接的基础上构建了虚拟渠道。

我们尽量重复使用TCP连接,而渠道是可以用完就关闭的。

(2)队列(queue):

队列是用来进行消息收发的地方,生产者把消息放到队列中,消费者从队列中获取消息。

(3)交换机(exchange):

交换机用于把消息路由到一个或者多个队列中。

RabbitMQ有非常多的使用模式,这里介绍在集成事件中用到的模式,即routing模式,如下图所示:

在这种模式中,生产者把消息发布到交换机中,消息会携带routingKey属性,交换机会根据routingKey的值把消息发送到一个或者多个队列;

消费者会从队列中获取消息;

交换机和队列都位于RabbitMQ服务器内部。

这种模式的优点在于,即使消费者不在线,消费者相关的消息也会被保存到队列中,当消费者上线之后,就可以获取离线期间错过的消息。

我们知道,在软件系统中,消息的生产者和消费者都不可能24小时在线,这种模式可以保证消费者收到因为服务器重启等原因而错过的消息。

RabbitMQ服务的安装比较简单,这里主要讲解如何在.NET中连接RabbitMQ进行消息收发。

首先,我们分别创建发送消息的项目和接受消息的控制台项目,这两个项目都需要安装NuGet包RabbitMQ.Client。

接下来,我们如以下代码所示进行消息发送:

C#
var factory = new ConnectionFactory
{
    //RabbitMQ服务器地址
    HostName = "127.0.0.1"
};

//交换机的名字
string exchangeName = "exchangeTest";

//routingKey的值
string eventName = "myEvent";

using var connection = await factory.CreateConnectionAsync();


while (true)
{

    //待发送消息
    string message = $"Hello RabbitMQ! {DateTime.Now}";

    //创建虚拟信道
    using var channel = await connection.CreateChannelAsync();

    var properties = new BasicProperties
    {
       DeliveryMode = DeliveryModes.Persistent // 设置消息持久化
    };


    //声明交换机
    await channel.ExchangeDeclareAsync(exchange:exchangeName,type:"direct");

    byte[] body = Encoding.UTF8.GetBytes(message);

    await channel.BasicPublishAsync(exchange: exchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, body: body);


    Console.WriteLine("发布了消息:"+message);


    await Task.Delay(1000); // 每秒发送一次消息

}

最后,我们来编写消息的消费端项目的代码,如以下代码所示:

C#
var factory = new ConnectionFactory
{
    HostName = "127.0.0.1",
	
};

string exchangeName = "exchangeTest1";

string eventName = "myEvent";


using var connection = await factory.CreateConnectionAsync(); 

using var channel = await connection.CreateChannelAsync();

string queueName = "queue1";

//声明了和消息发送端的名字一样的交换机,如果消息发送端已经声明了同名的交换机,这里的调用就会被忽略
await channel.ExchangeDeclareAsync(exchange: exchangeName, type: "direct");

//声明了一个队列用于接收交换机转发过来的消息,若已经存在指定名字的队列,则该调用会被忽略
await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

//把队列绑定到交换机中,并且设定了routingKet参数。
await channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: eventName);


//创建一个AsyncEventingBasicConsumer对象,用于从队列中接收消息
var consumer = new AsyncEventingBasicConsumer(channel);

//一条消息被接收到的时候,Received事件就会被触发,我们在ConsumerReceived方法中处理接收到的消息
consumer.ReceivedAsync += ConsumerReceived;

await channel.BasicConsumeAsync(queueName, autoAck: false, consumer:consumer);

Console.ReadLine();




async Task ConsumerReceived(object sender, BasicDeliverEventArgs e)
{

	try
	{

		//把收到的消息转换为string类型,然后输出到控制台。
		var bytes = e.Body.ToArray();

		string message = Encoding.UTF8.GetString(bytes);

        Console.WriteLine($"{DateTime.Now} 收到了消息 {message}");

		await channel.BasicAckAsync(e.DeliveryTag,multiple: false);

		await Task.Delay(1000); // 模拟处理时间



    }
	catch (Exception ex)
	{

		await channel.BasicRejectAsync(e.DeliveryTag, requeue: true);

        Console.WriteLine("处理收到的消息出错"+ex);

    }


}

完成上面的代码之后,我们分别运行消息发送端和消费端的程序。

如下图可以看到,两个程序能够得到预期的运行效果,即使消费端关闭,等消费端重新启动之后,也能收到离线的这段时间内错过的消息

订阅评论
提醒
0 评论
最旧
最新 最多投票
内联反馈
查看所有评论
滚动至顶部