RabbitMQ 有 3 种运行模式,集群模式有 2 种。详细如下:
单机模式:
即单机情况不做集群,就单独运行一个 RabbitMQ 而已。
普通模式:
默认模式,以两个节点(node-1、node-2)为例来进行说明。对于 Queue 来说,消息实体只存在于其中一个节点 node-1(或者 node-2),node-1 和 node-2 两个节点仅有相同的元数据,即队列的结构。当消息进入 node-1 节点的 Queue 后,consumer 从 node-2 节点消费时,RabbitMQ 会临时在 node-1、node-2 间进行消息传输,把 A 中的消息实体取出并经过 B 发送给 consumer。所以 consumer 应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理 Queue。否则无论 consumer 连 node-1 或 node-2,出口总在 node-1,会产生瓶颈。当 node-1 节点故障后,node-2 节点无法取到 node-1 节点中还未消费的消息实体。如果做了消息持久化,那么得等 node-1 节点恢复,然后才可被消费;如果没有持久化的话,就会产生消息丢失的现象。
镜像模式:
一般配合HAProxy配置为高可用集群,把需要的队列做成镜像队列,存在与多个节点属于 RabbitMQ 的 HA 方案。该模式解决了普通模式中的问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在客户端取数据时临时拉取。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。所以在对可靠性要求较高的场合中适用。
环境准备
3 台 CentOS Linux release 7.6.1810 (Core) :
- 192.168.0.3 (定义为中心节点,让其他两台加入,节点都是平等的可以选任意一台。)
- 192.168.0.4
- 192.168.0.5
三台机器都编辑/etc/hosts
映射主机名:
cat >> /etc/hosts << EOF 192.168.0.3 node-1 192.168.0.4 node-2 192.168.0.5 node-3 EOF
更新系统并安装依赖包:
yum -y update yum -y install gcc gcc-c++ glibc-devel autoconf make ncurses-devel openssl-devel xmlto socat unixODBC unixODBC-devel
关闭防火墙和selinux:
systemctl disable firewalld && systemctl stop firewalld setenforce 0 sed -i 's/^SELINUX=.*$/SELINUX=disabled/g' /etc/selinux/config
安装erlang
查看 erlang 和 rabbitmq 版本支持关系:https://www.rabbitmq.com/which-erlang.html,下载地址:https://github.com/erlang/otp/releases
wget https://github.com/erlang/otp/archive/OTP-21.3.7.1.tar.gz tar zxvf OTP-21.3.7.1.tar.gz && cd otp-OTP-21.3.7.1 ./otp_build autoconf ./configure --prefix=/usr/local/erlang --without-javac make make install
编辑/etc/profile
:
export ERLANG_HOME=/usr/local/erlang export PATH=$PATH:$ERLANG_HOME/bin
. /etc/profile
[root@node-1 ~]# erl Erlang/OTP 21 [erts-10.3.4] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:1] [hipe] Eshell V10.3.4 (abort with ^G) 1>
二进制包方式安装RabbitMQ
在三台机器上分别安装rabbitmq,下载安装地址:https://dl.bintray.com/rabbitmq/all/rabbitmq-server,这里下载的是rabbitmq-server-generic-unix-3.7.16.tar.xz
tar xvf rabbitmq-server-generic-unix-3.7.16.tar.xz mv rabbitmq_server-3.7.16/ /usr/local/rabbitmq
修改/etc/profile
:
export RABBITMQ_HOME=/usr/local/rabbitmq
export PATH=$PATH:$RABBITMQ_HOME/sbin
. /etc/profile
ln -s /usr/local/rabbitmq/sbin/* /usr/local/sbin/
rabbitmq-server -detached # 后台启动 rabbitmqctl stop # 停止 rabbitmqctl status # 查看状态 rabbitmqctl environment # 查看有效的节点配置 rabbitmqctl node_health_check # 对本地节点的运行状况检查
账号配置
安装启动后其实还不能在其它机器访问,rabbitmq 默认的 guest 账号只能在本地机器访问,如果想在其它机器访问必须配置其它账号:
# 创建管理员用户,负责整个MQ的运维 rabbitmqctl add_user admin pwd-for-admin # 赋予其administrator角色 rabbitmqctl set_user_tags admin administrator # 使用户admin具有/这个virtual host中所有资源的配置、写、读权限以便管理其中的资源 rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" # 查看用户和角色 rabbitmqctl list_users # 启动 rabbitmq 内置 web 插件, 管理 rabbitmq 账号等信息 rabbitmq-plugins enable rabbitmq_management
启动后可以使用:http://ip:15672
访问web界面,以上三台机器同样的操作,安装启动之后此时每台机器运行着单机模式的RabbitMQ。
配置 RabbitMQ 集群
Erlang Cookie
RabbitMQ 底层是通过 Erlang 架构来实现的,所以 rabbitmqctl 会启动 Erlang 节点,并基于 Erlang 节点来使用 Erlang 系统连接 RabbitMQ 节点,在连接过程中需要正确的 Erlang Cookie 和节点名称,Erlang 节点通过交换 Erlang Cookie 以获得认证。
Erlang Cookie 是保证不同节点可以相互通信的密钥,要保证集群中的不同节点相互通信必须共享相同的 Erlang Cookie。rpm安装存放在/var/lib/rabbitmq/.erlang.cookie
,二进制安装存放在/root/.erlang.cookie
,权限为 owner 只读。
普通集群创建
三台机器都安装好rabbitmq之后,把主节点的.erlang.cookie
复制到各节点:
先在 node-2、node-3 备份原有的.erlang.cookie
文件,然后先停止 rabbitmq 服务,如果不先停服 cookie 文件被修改后,后面操作无法正常停止。
cp /root/.erlang.cookie /root/.erlang.cookie.bak rabbitmqctl stop
在 node-1 复制.erlang.cookie
文件到各节点:
scp /root/.erlang.cookie root@node-2:/root/ scp /root/.erlang.cookie root@node-3:/root/
所有节点设置权限:
chmod 400 /root/.erlang.cookie
在 node-2、node-3 在启动 rabbitmq 服务:
rabbitmq-server -detached
在node-2、node-3节点执行下面命令,加入集群:
rabbitmqctl stop_app # 停止 rabbitmq 应用,没有停止 Erlang VM rabbitmqctl reset # 退出集群并恢复初始状态 # 默认是磁盘节点,这里需要已内存节点(--ram)需要加入集群 rabbitmqctl join_cluster --ram rabbit@node-1 rabbitmqctl start_app # 开启 rabbitmq 应用
查看群集状态:
rabbitmqctl cluster_status
现在可以用过网页控制台查看集群状态:
镜像集群创建
使用 rabbitmq 作为消息服务时,在服务负载不是很大的情况下,一般我们只需要一个 rabbitmq 节点便能为我们提供服务,可这难免会发生单点故障,要解决这个问题,我们便需要配置 rabbitmq 的集群和镜像。
镜像集群是在普通集群的基础上执行一条设置镜像队列策略的命令即可,在任意一个节点上执行:
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
参数解释:
ha-all
为策略名称。^
为匹配符,只有一个^
代表匹配所有,^zlh
为匹配名称为zlh
的 exchanges 或者 queue。ha-mode
为匹配类型,分为 3 种模式:
all
所有(所有的queue)
exctly
部分(需配置ha-params参数,此参数为int类型比如3,众多集群中的随机3台机器)
nodes
指定(需配置ha-params参数,此参数为数组类型比如["rabbit@node-1","rabbit@node-2",,"rabbit@node-3"]
这样指定为3台机器。)
查看策略:
rabbitmqctl list_policies
集群的常见运维操作
若某节点从集群退出,重置回独立节点,操作命令:
rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl start_app
若某节点需要停机维护,操作命令:
rabbitmqctl stop # 先停止服务 rabbitmq-server -detached # 维护完成后开启服务即可
若要修改某个节点类型,操作命令:
rabbitmqctl stop_app rabbitmqctl change_cluster_node_type disc
若要从集群种移除某各节点(最好使用第一种退出集群方法):
rabbitmqctl -n rabbit@node-3 stop_app rabbitmqctl forget_cluster_node rabbit@node-3
配置RabbitMQ的负载均衡
完成镜像队列设置之后,每各队列会被复制到各个节点,各个节点状态保持一致。因为 RabbitMQ 本身不提供负载均衡,需要搭建负载均衡器来提供负载转发,可以选择 HAProxy 和 Nginx。
HAProxy 负载方案
编译安装 PCRE
wget https://ftp.pcre.org/pub/pcre/pcre-8.42.tar.gz tar zxvf pcre-8.42.tar.gz && cd pcre-8.42 ./configure && make && make install
编译安装 HAProxy
# 下载后编译安装 wget https://www.haproxy.org/download/1.8/src/haproxy-1.8.20.tar.gz tar zxvf haproxy-1.8.20.tar.gz && cd haproxy-1.8.20/ make TARGET=linux2628 ARCH=x86_64 PREFIX=/usr/local/haproxy USE_PCRE=1 make install PREFIX=/usr/local/haproxy ln -s /usr/local/haproxy/sbin/haproxy /usr/sbin/ # 创建配置文件目录: mkdir /etc/haproxy # 创建用户: useradd -s /sbin/nologin -M haproxy
HAProxy 配置文件/etc/haproxy/haproxy.cfg
global
log 127.0.0.1 local0 notice
log 127.0.0.1 local1 info
pidfile /var/run/haproxy.pid
maxconn 4096
user haproxy
group haproxy
daemon
defaults
log global
option dontlognull
option httpclose
option redispatch
balance roundrobin
maxconn 4096
timeout connect 5s
timeout client 5s
timeout server 3s
timeout check 5s
retries 3
# 统计页面配置
listen admin_stats
bind 0.0.0.0:8100
stats enable
mode http
log global
stats uri /stats
stats realm Haproxy Statistics
stats auth admin:admin
stats admin if TRUE
stats refresh 20s
# RabbitMQ 管理页面
listen rabbitmq_admin
bind 0.0.0.0:8010
mode http
server node-1 node-1:15672
server node-2 node-2:15672
server node-3 node-3:15672
# RabbitMQ 服务
listen rabbitmq_cluster
bind 0.0.0.0:8020
mode tcp
option tcplog
balance roundrobin
timeout client 3h
timeout server 3h
server node-1 node-1:5672 check inter 5000 rise 2 fall 3
server node-2 node-2:5672 check inter 5000 rise 2 fall 3
server node-3 node-3:5672 check inter 5000 rise 2 fall 3
haproxy 常用命令:
# 检查配置文件语法 haproxy -c -f /etc/haproxy/haproxy.cfg # 启动 haproxy haproxy -f /etc/haproxy/haproxy.cfg # restart haproxy -f /etc/haproxy.cfg -st `cat /var/run/haproxy.pid` # reload haproxy -f /etc/haproxy.cfg -sf `cat /var/run/haproxy.pid`
Nginx 负载方案
nginx安装方法略过,版本需要大于1.9.0,且编译时配置--with-stream
参数。
nginx配置文件:
worker_processes auto;
error_log logs/nginx_error.log crit;
pid logs/nginx.pid;
worker_rlimit_nofile 51200;
events {
use epoll;
worker_connections 51200;
multi_accept on;
}
stream {
upstream rabbitmq {
server 192.168.0.3:5672;
server 192.168.0.4:5672;
server 192.168.0.5:5672;
}
server {
listen 5678;
proxy_connect_timeout 1s;
proxy_timeout 600s;
proxy_pass rabbitmq;
}
}
以上用 haproxy 或 nginx 实现了 rabbitmq 的负载均衡,如果要实现高可用还需要配置 Keepalived 实现。
测试rabbitmq集群
安装pika模块:
pip install pika
创建生产者 producer.py 文件:
# -*- coding: utf-8 -*-
import pika
credentials = pika.PlainCredentials('admin', 'pwd-for-admin')
connection = pika.BlockingConnection(
pika.ConnectionParameters('192.168.0.3', 5678, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='this is a test message.')
print("开始队列...")
connection.close()
创建消费者 consumer.py 文件:
# -*- coding: utf-8 -*-
import pika
credentials = pika.PlainCredentials('admin', 'pwd-for-admin')
connection = pika.BlockingConnection(
pika.ConnectionParameters('192.168.0.3', 5678, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='hello')
# 回调函数,收到消息后回调
def callback(ch, method, properties, body):
print("消费列队消息: %s" % body)
channel.basic_consume(queue='hello',
on_message_callback=callback,
auto_ack=True)
print('正在等待消息...')
channel.start_consuming()
查看rabbitmq列队和属性:
rabbitmqctl list_queues
RabbitMQ 的用户权限 VirtualHost 和角色概念补充
像 MySQL 有数据库的概念并且可以指定用户对库和表等操作的权限。那 RabbitMQ 呢?RabbitMQ 也有类似的权限管理。在 RabbitMQ 中可以虚拟消息服务器 VirtualHost,每个 VirtualHost 相当月一个相对独立的 RabbitMQ 服务器,每个 VirtualHost 之间是相互隔离的,exchange、queue、message 不能互通。
在 RabbitMQ 中无法通过 AMQP 创建 VirtualHost,可以通过rabbitmqctl add_vhost [vhostname]
命令来创建。
通常在权限管理中主要包含三步:
1、新建用户
rabbitmqctl add_user superrd pwd-for-superrd
2、配置权限
set_permissions [-p ] <user> <conf> <write> <read> rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
其中".*"
的位置分别用正则表达式来匹配特定的资源,如'^(amq.gen.*|amq.default)$'
可以匹配 server 生成的和默认的 exchange,'^$'
不匹配任何资源。"/"
代表 virtual host 为"/"
,把这个"/"
理解成字符串就行。
查看用户权限命令:
rabbitmqctl list_user_permissions admin rabbitmqctl list_permissions -p /
3、配置角色
rabbitmqctl set_user_tags [user] [role]
RabbitMQ 中的角色分为如下五类:none、management、policymaker、monitoring、administrator
例如创建监控用户,负责整个MQ的监控:
rabbitmqctl add_user mtuser pwd-for-mtuser rabbitmqctl set_user_tags mtuser monitoring
官方解释如下:
- none 不能访问 management plugin
- management 用户可以通过 AMQP 做的任何事外加:
列出自己可以通过 AMQP 登入的 virtual hosts
查看自己的 virtual hosts 中的 queues, exchanges 和 bindings
查看和关闭自己的 channels 和 connections
查看有关自己的 virtual hosts 的“全局”的统计信息,包含其他用户在这些 virtual hosts 中的活动。 - policymaker management 可以做的任何事外加:
查看、创建和删除自己的 virtual hosts 所属的 policies 和 parameters - monitoring management 可以做的任何事外加:
列出所有 virtual hosts,包括他们不能登录的 virtual hosts
查看其他用户的 connections 和 channels
查看节点级别的数据如 clustering 和 memory 使用情况
查看真正的关于所有 virtual hosts 的全局的统计信息 - administrator policymaker 和 monitoring 可以做的任何事外加:
创建和删除 virtual hosts
查看、创建和删除 users
查看创建和删除 permissions
关闭其他用户的 connections
暂无评论内容