1. 为什么选择 RabbitMQ?
在分布式系统中,消息队列(Message Queue)用于解耦服务、提高系统吞吐量。RabbitMQ 是一个基于 AMQP(高级消息队列协议)的消息代理,因其高可靠性、可扩展性、丰富的插件系统而被广泛应用。
RabbitMQ 适用场景:
o 微服务架构:不同服务之间解耦,提高系统稳定性
o 异步任务处理:降低系统负载,提高响应速度
o 事件驱动架构:支持实时数据流处理,如日志收集、用户行为跟踪
o 分布式系统通信:多个服务之间进行可靠的消息传递
2. 安装 RabbitMQ(适用于 Ubuntu / Debian)
2.1 安装 Erlang(RabbitMQ 依赖)
RabbitMQ 依赖 Erlang 运行环境,先安装 Erlang:
sudo apt update && sudo apt install -y erlang
安装完成后,检查版本:
erl -version
2.2 安装 RabbitMQ
从官方仓库下载并安装 RabbitMQ:
sudo apt install -y rabbitmq-server
启用 RabbitMQ 并设置为开机自启:
sudo systemctl enable rabbitmq-server
sudo systemctl start rabbitmq-server
检查服务状态:
sudo systemctl status rabbitmq-server
如果 RabbitMQ 正常运行,会显示 active (running) 状态。
3. RabbitMQ 基础配置
3.1 启用管理插件
RabbitMQ 自带一个 Web 管理界面,默认未启用,可通过以下命令开启:
sudo rabbitmq-plugins enable rabbitmq_management
管理界面默认监听 15672 端口,可以通过浏览器访问:
http://服务器IP:15672
默认账号 guest,密码 guest(仅限本机访问)。
3.2 创建管理员账户
为了安全性,建议创建一个新管理员账户:
sudo rabbitmqctl add_user admin yourpassword
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
然后使用 admin / yourpassword 登录 Web 界面管理 RabbitMQ。
4. RabbitMQ 消息队列基础操作
4.1 创建虚拟主机(VHost)
RabbitMQ 使用 虚拟主机(VHost) 进行消息隔离,可以创建不同的 VHost 来管理不同的业务场景:
sudo rabbitmqctl add_vhost my_vhost
4.2 创建用户并赋权
创建一个 my_user 用户,并赋予 my_vhost 的读写权限:
sudo rabbitmqctl add_user my_user mypassword
sudo rabbitmqctl set_permissions -p my_vhost my_user ".*" ".*" ".*"
5. 使用 Python 连接 RabbitMQ
安装 pika 库(RabbitMQ 的 Python 客户端):
pip install pika
5.1 生产者(发送消息)
创建 producer.py:
import pika
# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 创建队列
channel.queue_declare(queue='hello')
# 发送消息
channel.basic_publish(exchange='', routing_key='hello', body='Hello, RabbitMQ!')
print(" [x] Sent 'Hello, RabbitMQ!'")
connection.close()
运行:
python producer.py
5.2 消费者(接收消息)
创建 consumer.py:
import pika
# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 确保队列存在
channel.queue_declare(queue='hello')
# 定义回调函数
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
# 监听队列
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
运行:
python consumer.py
当 producer.py 发送消息后,consumer.py 就会收到并打印出来。
6. RabbitMQ 高级特性
6.1 交换机(Exchange)与路由(Routing)
RabbitMQ 使用 交换机 控制消息的分发方式:
o Direct(直连交换机):基于 routing_key 精确匹配
o Fanout(广播交换机):消息发送到所有绑定队列
o Topic(主题交换机):支持通配符匹配(logs.*,*.info)
o Headers(头部交换机):基于消息头属性进行匹配
示例:创建一个 direct 类型交换机并绑定队列:
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.queue_bind(exchange='direct_logs', queue='hello', routing_key='info')
6.2 持久化队列
默认 RabbitMQ 重启后消息会丢失,可以开启持久化:
channel.queue_declare(queue='hello', durable=True)
消息发送时也需要标记持久化:
channel.basic_publish(exchange='', routing_key='hello', body='Hello, RabbitMQ!',
properties=pika.BasicProperties(delivery_mode=2))
7. 监控与优化
7.1 RabbitMQ 状态监控
查看所有队列:
sudo rabbitmqctl list_queues
查看所有连接:
sudo rabbitmqctl list_connections
7.2 调优 RabbitMQ 性能
o 调整 rabbitmq.conf 文件中的 vm_memory_high_watermark,控制内存使用率
o 采用 Lazy Queue(懒加载队列),减少内存占用:
sudo rabbitmqctl set_policy lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
o 限制队列最大长度,防止队列堆积:
arguments={'x-max-length': 1000}
8. 总结
本教程从零开始搭建了一个 RabbitMQ 服务器,并介绍了基本的消息收发、用户管理、队列持久化及优化方法。RabbitMQ 在分布式架构、微服务、实时数据处理等场景中应用广泛,掌握这些知识能帮助你构建更高效的系统。