和领域事件不同,集成事件用于在微服务间进行事件的传递,因为这是服务器间的通信,所以必须借助第三方服务器作为事件总线。
我们一般使用消息中间件来作为事件总线,目前常用的消息中间件有Redis、RabbitMQ、Kafka、ActiveMQ等,这里我们使用RabbitMQ。
我们先来了解一下RabbitMQ中的几个基本概念:
(1)渠道(channel):
渠道是消息的生产者、消费者和服务器之间进行通信的虚拟连接。
为什么叫”虚拟连接”呢?因为TCP连接的建立是非常消耗资源的,所以RabbitMQ在TCP连接的基础上构建了虚拟渠道。
我们尽量重复使用TCP连接,而渠道是可以用完就关闭的。
(2)队列(queue):
队列是用来进行消息收发的地方,生产者把消息放到队列中,消费者从队列中获取消息。
(3)交换机(exchange):
交换机用于把消息路由到一个或者多个队列中。
RabbitMQ有非常多的使用模式,这里介绍在集成事件中用到的模式,即routing模式,如下图所示:

在这种模式中,生产者把消息发布到交换机中,消息会携带routingKey属性,交换机会根据routingKey的值把消息发送到一个或者多个队列;
消费者会从队列中获取消息;
交换机和队列都位于RabbitMQ服务器内部。
这种模式的优点在于,即使消费者不在线,消费者相关的消息也会被保存到队列中,当消费者上线之后,就可以获取离线期间错过的消息。
我们知道,在软件系统中,消息的生产者和消费者都不可能24小时在线,这种模式可以保证消费者收到因为服务器重启等原因而错过的消息。
RabbitMQ服务的安装比较简单,这里主要讲解如何在.NET中连接RabbitMQ进行消息收发。
首先,我们分别创建发送消息的项目和接受消息的控制台项目,这两个项目都需要安装NuGet包RabbitMQ.Client。
接下来,我们如以下代码所示进行消息发送:
var factory = new ConnectionFactory
{
//RabbitMQ服务器地址
HostName = "127.0.0.1"
};
//交换机的名字
string exchangeName = "exchangeTest";
//routingKey的值
string eventName = "myEvent";
using var connection = await factory.CreateConnectionAsync();
while (true)
{
//待发送消息
string message = $"Hello RabbitMQ! {DateTime.Now}";
//创建虚拟信道
using var channel = await connection.CreateChannelAsync();
var properties = new BasicProperties
{
DeliveryMode = DeliveryModes.Persistent // 设置消息持久化
};
//声明交换机
await channel.ExchangeDeclareAsync(exchange:exchangeName,type:"direct");
byte[] body = Encoding.UTF8.GetBytes(message);
await channel.BasicPublishAsync(exchange: exchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, body: body);
Console.WriteLine("发布了消息:"+message);
await Task.Delay(1000); // 每秒发送一次消息
}
最后,我们来编写消息的消费端项目的代码,如以下代码所示:
var factory = new ConnectionFactory
{
HostName = "127.0.0.1",
};
string exchangeName = "exchangeTest1";
string eventName = "myEvent";
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
string queueName = "queue1";
//声明了和消息发送端的名字一样的交换机,如果消息发送端已经声明了同名的交换机,这里的调用就会被忽略
await channel.ExchangeDeclareAsync(exchange: exchangeName, type: "direct");
//声明了一个队列用于接收交换机转发过来的消息,若已经存在指定名字的队列,则该调用会被忽略
await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
//把队列绑定到交换机中,并且设定了routingKet参数。
await channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: eventName);
//创建一个AsyncEventingBasicConsumer对象,用于从队列中接收消息
var consumer = new AsyncEventingBasicConsumer(channel);
//一条消息被接收到的时候,Received事件就会被触发,我们在ConsumerReceived方法中处理接收到的消息
consumer.ReceivedAsync += ConsumerReceived;
await channel.BasicConsumeAsync(queueName, autoAck: false, consumer:consumer);
Console.ReadLine();
async Task ConsumerReceived(object sender, BasicDeliverEventArgs e)
{
try
{
//把收到的消息转换为string类型,然后输出到控制台。
var bytes = e.Body.ToArray();
string message = Encoding.UTF8.GetString(bytes);
Console.WriteLine($"{DateTime.Now} 收到了消息 {message}");
await channel.BasicAckAsync(e.DeliveryTag,multiple: false);
await Task.Delay(1000); // 模拟处理时间
}
catch (Exception ex)
{
await channel.BasicRejectAsync(e.DeliveryTag, requeue: true);
Console.WriteLine("处理收到的消息出错"+ex);
}
}
完成上面的代码之后,我们分别运行消息发送端和消费端的程序。
如下图可以看到,两个程序能够得到预期的运行效果,即使消费端关闭,等消费端重新启动之后,也能收到离线的这段时间内错过的消息
