RabbitMQ简介 — RabbitMQ的安装及简单使用

RabbitMQ是由Erlang语言编写的,也正因如此,在安装RabbitMQ之前需要安装Erlang。

1. 安装Erlang

下面首先演示Erlang的安装。

第一步:

先到Erlang的官网下载安装包

下载完成后,解压安装包,并配置安装目录,这里我们预备安装到/opy/erlang目录下:

Bash
//解压安装包
tar zxvf otp_src_27.3.4.tar.gz
 
//制定安装路径
./configure --prefix=/opt/erlang

第二步:

安装Erlang

Bash
make
make install

第三步:

修改/etc/profile配置文件,添加下面的环境变量:

Bash
ERLANG_HOME=/opt/erlang
export PATH=$PATH:$ERLANG_HOME/bin
export ERLANG_HOME

然后执行如下命令让配置文件生效:

Bash
source /etc/profile

最后 输入erl命令来验证Erlang是否安装成功,如果出现类似以下的提示即表示安装成功:

Bash
Erlang/OTP 27 [erts-15.2.7] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:1] [jit:ns]

Eshell V15.2.7 

2. 安装RabbitMQ

第一步:

下载并安装RabbitMQ的签名密钥

Bash
curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc  | sudo gpg --dearmor -o /usr/share/keyrings/rabbitmq-archive-keyring.gpg

第二步:

添加RabbitMQ的APT源:

Bash
echo "deb [signed-by=/usr/share/keyrings/rabbitmq-archive-keyring.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu  $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list

//更新包列表
sudo apt update

最后安装RabbitMQ Server和RabbitMQ Management Plugin

Bash
sudo apt install rabbitmq-server -y

sudo rabbitmq-plugins enable rabbitmq_management

3. RabbitMQ的运行

任意打开一个Shell窗口,输入如下命令以运行RabbitMQ服务:

Bash
rabbitmq-server -detached

在rabbitmq-server 命令后面添加一个”-detached”参数是为了能够让RabbitMQ服务以守护进程的方式在后台运行,这样就不会因为当前Shell窗口的关闭而影响服务。

运行rabbitmqctl status命令查看RabbitMQ是否正常启动,示例如下:

Bash
root@qwer666:~# rabbitmqctl status
Status of node rabbit@qwer666 ...
Runtime

OS PID: 74326
OS: Linux
Uptime (seconds): 79780
Is under maintenance?: false
RabbitMQ version: 3.9.27
RabbitMQ release series support status: supported
Node name: rabbit@qwer666
Erlang configuration: Erlang/OTP 24 [erts-12.2.1] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:1] [jit]
Crypto library: OpenSSL 3.0.2 15 Mar 2022
Erlang processes: 358 used, 1048576 limit
Scheduler run queue: 0
Cluster heartbeat timeout (net_ticktime): 60

Plugins

Enabled plugin file: /etc/rabbitmq/enabled_plugins
Enabled plugins:

 * rabbitmq_management
 * amqp_client
 * rabbitmq_web_dispatch
 * cowboy
 * cowlib
 * rabbitmq_management_agent

Data directory

Node data directory: /var/lib/rabbitmq/mnesia/rabbit@qwer666
Raft data directory: /var/lib/rabbitmq/mnesia/rabbit@qwer666/quorum/rabbit@qwer666

Config files


Log file(s)

 * /var/log/rabbitmq/rabbit@qwer666.log
 * /var/log/rabbitmq/rabbit@qwer666_upgrade.log
 * <stdout>

Alarms

(none)

Memory

Total memory used: 0.1326 gb
Calculation strategy: rss
Memory high watermark setting: 0.4 of available memory, computed to: 0.6749 gb

reserved_unallocated: 0.0823 gb (62.11 %)
code: 0.0355 gb (26.74 %)
other_proc: 0.0183 gb (13.8 %)
other_system: 0.0132 gb (9.96 %)
other_ets: 0.0034 gb (2.54 %)
plugins: 0.0015 gb (1.12 %)
atom: 0.0014 gb (1.08 %)
binary: 4.0e-4 gb (0.3 %)
metrics: 2.0e-4 gb (0.17 %)
mgmt_db: 2.0e-4 gb (0.17 %)
mnesia: 1.0e-4 gb (0.07 %)
msg_index: 0.0 gb (0.02 %)
quorum_ets: 0.0 gb (0.02 %)
connection_other: 0.0 gb (0.0 %)
stream_queue_procs: 0.0 gb (0.0 %)
stream_queue_replica_reader_procs: 0.0 gb (0.0 %)
allocated_unused: 0.0 gb (0.0 %)
connection_channels: 0.0 gb (0.0 %)
connection_readers: 0.0 gb (0.0 %)
connection_writers: 0.0 gb (0.0 %)
queue_procs: 0.0 gb (0.0 %)
queue_slave_procs: 0.0 gb (0.0 %)
quorum_queue_procs: 0.0 gb (0.0 %)
stream_queue_coordinator_procs: 0.0 gb (0.0 %)

File Descriptors

Total: 2, limit: 65439
Sockets: 0, limit: 58893

Free Disk Space

Low free disk space watermark: 0.05 gb
Free disk space: 35.6248 gb

Totals

Connection count: 0
Queue count: 0
Virtual host count: 1

Listeners

Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Interface: [::], port: 15672, protocol: http, purpose: HTTP API

如果RabbitMQ正常启动,会输出如上所示的信息。

当然也可以通过rabbitmqctl cluster_status命令来查看集群信息,目前只有一个RabbitMQ服务节点,可以看作单节点的集群:

Bash
Cluster status of node rabbit@qwer666 ...
Basics

Cluster name: rabbit@qwer666

Disk Nodes

rabbit@qwer666

Running Nodes

rabbit@qwer666

Versions

rabbit@qwer666: RabbitMQ 3.9.27 on Erlang 24.2.1

Maintenance status

Node: rabbit@qwer666, status: not under maintenance

Alarms

(none)

Network Partitions

(none)

Listeners

Node: rabbit@qwer666, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@qwer666, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Node: rabbit@qwer666, interface: [::], port: 15672, protocol: http, purpose: HTTP API

Feature flags

Flag: drop_unroutable_metric, state: disabled
Flag: empty_basic_get_metric, state: disabled
Flag: implicit_default_bindings, state: enabled
Flag: maintenance_mode_status, state: enabled
Flag: quorum_queue, state: enabled
Flag: stream_queue, state: enabled
Flag: user_limits, state: enabled
Flag: virtual_host_metadata, state: enabled

4. 生产和消费消息

这里将演示如何使用RabbitMQ .NET 客户端生产和消费消息。

生产者客户端代码:

C#
var factory = new ConnectionFactory
{
    //RabbitMQ服务器地址
    HostName = "127.0.0.1"
};

//交换机的名字
string exchangeName = "exchangeTest";

//routingKey的值
string eventName = "myEvent";

using var connection = await factory.CreateConnectionAsync();


while (true)
{

    //待发送消息
    string message = $"Hello RabbitMQ! {DateTime.Now}";

    //创建虚拟信道
    using var channel = await connection.CreateChannelAsync();

    var properties = new BasicProperties
    {
       DeliveryMode = DeliveryModes.Persistent // 设置消息持久化
    };


    //声明交换机
    await channel.ExchangeDeclareAsync(exchange:exchangeName,type:"direct");

    byte[] body = Encoding.UTF8.GetBytes(message);

    await channel.BasicPublishAsync(exchange: exchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, body: body);


    Console.WriteLine("发布了消息:"+message);


    await Task.Delay(1000); // 每秒发送一次消息

}

消费者客户端代码:

C#
var factory = new ConnectionFactory
{
    HostName = "127.0.0.1",
	
};

string exchangeName = "exchangeTest1";

string eventName = "myEvent";


using var connection = await factory.CreateConnectionAsync(); 

using var channel = await connection.CreateChannelAsync();

string queueName = "queue1";

//声明了和消息发送端的名字一样的交换机,如果消息发送端已经声明了同名的交换机,这里的调用就会被忽略
await channel.ExchangeDeclareAsync(exchange: exchangeName, type: "direct");

//声明了一个队列用于接收交换机转发过来的消息,若已经存在指定名字的队列,则该调用会被忽略
await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

//把队列绑定到交换机中,并且设定了routingKet参数。
await channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: eventName);


//创建一个AsyncEventingBasicConsumer对象,用于从队列中接收消息
var consumer = new AsyncEventingBasicConsumer(channel);

//一条消息被接收到的时候,Received事件就会被触发,我们在ConsumerReceived方法中处理接收到的消息
consumer.ReceivedAsync += ConsumerReceived;

await channel.BasicConsumeAsync(queueName, autoAck: false, consumer:consumer);

Console.ReadLine();




async Task ConsumerReceived(object sender, BasicDeliverEventArgs e)
{

	try
	{

		//把收到的消息转换为string类型,然后输出到控制台。
		var bytes = e.Body.ToArray();

		string message = Encoding.UTF8.GetString(bytes);

        Console.WriteLine($"{DateTime.Now} 收到了消息 {message}");

		await channel.BasicAckAsync(e.DeliveryTag,multiple: false);

		await Task.Delay(1000); // 模拟处理时间



    }
	catch (Exception ex)
	{

		await channel.BasicRejectAsync(e.DeliveryTag, requeue: true);

        Console.WriteLine("处理收到的消息出错"+ex);

    }


}
订阅评论
提醒
0 评论
最旧
最新 最多投票
内联反馈
查看所有评论
滚动至顶部