RabbitMQ是由Erlang语言编写的,也正因如此,在安装RabbitMQ之前需要安装Erlang。
1. 安装Erlang
下面首先演示Erlang的安装。
第一步:
先到Erlang的官网下载安装包

下载完成后,解压安装包,并配置安装目录,这里我们预备安装到/opy/erlang目录下:
Bash
//解压安装包
tar zxvf otp_src_27.3.4.tar.gz
//制定安装路径
./configure --prefix=/opt/erlang
第二步:
安装Erlang
Bash
make
make install
第三步:
修改/etc/profile配置文件,添加下面的环境变量:
Bash
ERLANG_HOME=/opt/erlang
export PATH=$PATH:$ERLANG_HOME/bin
export ERLANG_HOME
然后执行如下命令让配置文件生效:
Bash
source /etc/profile
最后 输入erl命令来验证Erlang是否安装成功,如果出现类似以下的提示即表示安装成功:
Bash
Erlang/OTP 27 [erts-15.2.7] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:1] [jit:ns]
Eshell V15.2.7
2. 安装RabbitMQ
第一步:
下载并安装RabbitMQ的签名密钥
Bash
curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo gpg --dearmor -o /usr/share/keyrings/rabbitmq-archive-keyring.gpg
第二步:
添加RabbitMQ的APT源:
Bash
echo "deb [signed-by=/usr/share/keyrings/rabbitmq-archive-keyring.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
//更新包列表
sudo apt update
最后安装RabbitMQ Server和RabbitMQ Management Plugin
Bash
sudo apt install rabbitmq-server -y
sudo rabbitmq-plugins enable rabbitmq_management
3. RabbitMQ的运行
任意打开一个Shell窗口,输入如下命令以运行RabbitMQ服务:
Bash
rabbitmq-server -detached
在rabbitmq-server 命令后面添加一个”-detached”参数是为了能够让RabbitMQ服务以守护进程的方式在后台运行,这样就不会因为当前Shell窗口的关闭而影响服务。
运行rabbitmqctl status命令查看RabbitMQ是否正常启动,示例如下:
Bash
root@qwer666:~# rabbitmqctl status
Status of node rabbit@qwer666 ...
Runtime
OS PID: 74326
OS: Linux
Uptime (seconds): 79780
Is under maintenance?: false
RabbitMQ version: 3.9.27
RabbitMQ release series support status: supported
Node name: rabbit@qwer666
Erlang configuration: Erlang/OTP 24 [erts-12.2.1] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:1] [jit]
Crypto library: OpenSSL 3.0.2 15 Mar 2022
Erlang processes: 358 used, 1048576 limit
Scheduler run queue: 0
Cluster heartbeat timeout (net_ticktime): 60
Plugins
Enabled plugin file: /etc/rabbitmq/enabled_plugins
Enabled plugins:
* rabbitmq_management
* amqp_client
* rabbitmq_web_dispatch
* cowboy
* cowlib
* rabbitmq_management_agent
Data directory
Node data directory: /var/lib/rabbitmq/mnesia/rabbit@qwer666
Raft data directory: /var/lib/rabbitmq/mnesia/rabbit@qwer666/quorum/rabbit@qwer666
Config files
Log file(s)
* /var/log/rabbitmq/rabbit@qwer666.log
* /var/log/rabbitmq/rabbit@qwer666_upgrade.log
* <stdout>
Alarms
(none)
Memory
Total memory used: 0.1326 gb
Calculation strategy: rss
Memory high watermark setting: 0.4 of available memory, computed to: 0.6749 gb
reserved_unallocated: 0.0823 gb (62.11 %)
code: 0.0355 gb (26.74 %)
other_proc: 0.0183 gb (13.8 %)
other_system: 0.0132 gb (9.96 %)
other_ets: 0.0034 gb (2.54 %)
plugins: 0.0015 gb (1.12 %)
atom: 0.0014 gb (1.08 %)
binary: 4.0e-4 gb (0.3 %)
metrics: 2.0e-4 gb (0.17 %)
mgmt_db: 2.0e-4 gb (0.17 %)
mnesia: 1.0e-4 gb (0.07 %)
msg_index: 0.0 gb (0.02 %)
quorum_ets: 0.0 gb (0.02 %)
connection_other: 0.0 gb (0.0 %)
stream_queue_procs: 0.0 gb (0.0 %)
stream_queue_replica_reader_procs: 0.0 gb (0.0 %)
allocated_unused: 0.0 gb (0.0 %)
connection_channels: 0.0 gb (0.0 %)
connection_readers: 0.0 gb (0.0 %)
connection_writers: 0.0 gb (0.0 %)
queue_procs: 0.0 gb (0.0 %)
queue_slave_procs: 0.0 gb (0.0 %)
quorum_queue_procs: 0.0 gb (0.0 %)
stream_queue_coordinator_procs: 0.0 gb (0.0 %)
File Descriptors
Total: 2, limit: 65439
Sockets: 0, limit: 58893
Free Disk Space
Low free disk space watermark: 0.05 gb
Free disk space: 35.6248 gb
Totals
Connection count: 0
Queue count: 0
Virtual host count: 1
Listeners
Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Interface: [::], port: 15672, protocol: http, purpose: HTTP API
如果RabbitMQ正常启动,会输出如上所示的信息。
当然也可以通过rabbitmqctl cluster_status命令来查看集群信息,目前只有一个RabbitMQ服务节点,可以看作单节点的集群:
Bash
Cluster status of node rabbit@qwer666 ...
Basics
Cluster name: rabbit@qwer666
Disk Nodes
rabbit@qwer666
Running Nodes
rabbit@qwer666
Versions
rabbit@qwer666: RabbitMQ 3.9.27 on Erlang 24.2.1
Maintenance status
Node: rabbit@qwer666, status: not under maintenance
Alarms
(none)
Network Partitions
(none)
Listeners
Node: rabbit@qwer666, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@qwer666, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Node: rabbit@qwer666, interface: [::], port: 15672, protocol: http, purpose: HTTP API
Feature flags
Flag: drop_unroutable_metric, state: disabled
Flag: empty_basic_get_metric, state: disabled
Flag: implicit_default_bindings, state: enabled
Flag: maintenance_mode_status, state: enabled
Flag: quorum_queue, state: enabled
Flag: stream_queue, state: enabled
Flag: user_limits, state: enabled
Flag: virtual_host_metadata, state: enabled
4. 生产和消费消息
这里将演示如何使用RabbitMQ .NET 客户端生产和消费消息。
生产者客户端代码:
C#
var factory = new ConnectionFactory
{
//RabbitMQ服务器地址
HostName = "127.0.0.1"
};
//交换机的名字
string exchangeName = "exchangeTest";
//routingKey的值
string eventName = "myEvent";
using var connection = await factory.CreateConnectionAsync();
while (true)
{
//待发送消息
string message = $"Hello RabbitMQ! {DateTime.Now}";
//创建虚拟信道
using var channel = await connection.CreateChannelAsync();
var properties = new BasicProperties
{
DeliveryMode = DeliveryModes.Persistent // 设置消息持久化
};
//声明交换机
await channel.ExchangeDeclareAsync(exchange:exchangeName,type:"direct");
byte[] body = Encoding.UTF8.GetBytes(message);
await channel.BasicPublishAsync(exchange: exchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, body: body);
Console.WriteLine("发布了消息:"+message);
await Task.Delay(1000); // 每秒发送一次消息
}
消费者客户端代码:
C#
var factory = new ConnectionFactory
{
HostName = "127.0.0.1",
};
string exchangeName = "exchangeTest1";
string eventName = "myEvent";
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
string queueName = "queue1";
//声明了和消息发送端的名字一样的交换机,如果消息发送端已经声明了同名的交换机,这里的调用就会被忽略
await channel.ExchangeDeclareAsync(exchange: exchangeName, type: "direct");
//声明了一个队列用于接收交换机转发过来的消息,若已经存在指定名字的队列,则该调用会被忽略
await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
//把队列绑定到交换机中,并且设定了routingKet参数。
await channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: eventName);
//创建一个AsyncEventingBasicConsumer对象,用于从队列中接收消息
var consumer = new AsyncEventingBasicConsumer(channel);
//一条消息被接收到的时候,Received事件就会被触发,我们在ConsumerReceived方法中处理接收到的消息
consumer.ReceivedAsync += ConsumerReceived;
await channel.BasicConsumeAsync(queueName, autoAck: false, consumer:consumer);
Console.ReadLine();
async Task ConsumerReceived(object sender, BasicDeliverEventArgs e)
{
try
{
//把收到的消息转换为string类型,然后输出到控制台。
var bytes = e.Body.ToArray();
string message = Encoding.UTF8.GetString(bytes);
Console.WriteLine($"{DateTime.Now} 收到了消息 {message}");
await channel.BasicAckAsync(e.DeliveryTag,multiple: false);
await Task.Delay(1000); // 模拟处理时间
}
catch (Exception ex)
{
await channel.BasicRejectAsync(e.DeliveryTag, requeue: true);
Console.WriteLine("处理收到的消息出错"+ex);
}
}