消息队列 RabbitMQ 单机模式、集群、镜像集群+负载均衡

RabbitMQ 有 3 种运行模式,集群模式有 2 种。详细如下:
单机模式:
即单机情况不做集群,就单独运行一个 RabbitMQ  而已。
普通模式:
默认模式,以两个节点(node-1、node-2)为例来进行说明。对于 Queue 来说,消息实体只存在于其中一个节点 node-1(或者 node-2),node-1 和 node-2 两个节点仅有相同的元数据,即队列的结构。当消息进入 node-1 节点的 Queue 后,consumer 从 node-2 节点消费时,RabbitMQ 会临时在 node-1、node-2 间进行消息传输,把 A 中的消息实体取出并经过 B 发送给 consumer。所以 consumer 应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理 Queue。否则无论 consumer 连 node-1 或 node-2,出口总在 node-1,会产生瓶颈。当 node-1 节点故障后,node-2 节点无法取到 node-1 节点中还未消费的消息实体。如果做了消息持久化,那么得等 node-1 节点恢复,然后才可被消费;如果没有持久化的话,就会产生消息丢失的现象。
镜像模式:
一般配合HAProxy配置为高可用集群,把需要的队列做成镜像队列,存在与多个节点属于 RabbitMQ 的 HA 方案。该模式解决了普通模式中的问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在客户端取数据时临时拉取。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。所以在对可靠性要求较高的场合中适用。

环境准备

3 台 CentOS Linux release 7.6.1810 (Core) :

  • 192.168.0.3 (定义为中心节点,让其他两台加入,节点都是平等的可以选任意一台。)
  • 192.168.0.4
  • 192.168.0.5

三台机器都编辑/etc/hosts映射主机名:

cat >> /etc/hosts << EOF
192.168.0.3 node-1
192.168.0.4 node-2
192.168.0.5 node-3
EOF

更新系统并安装依赖包:

yum -y update
yum -y install gcc gcc-c++ glibc-devel autoconf make ncurses-devel openssl-devel xmlto socat unixODBC unixODBC-devel

关闭防火墙和selinux:

systemctl disable firewalld && systemctl stop firewalld
setenforce 0
sed -i 's/^SELINUX=.*$/SELINUX=disabled/g' /etc/selinux/config

安装erlang

查看 erlang 和 rabbitmq 版本支持关系:https://www.rabbitmq.com/which-erlang.html,下载地址:https://github.com/erlang/otp/releases

wget https://github.com/erlang/otp/archive/OTP-21.3.7.1.tar.gz
tar zxvf OTP-21.3.7.1.tar.gz && cd otp-OTP-21.3.7.1
./otp_build autoconf
./configure --prefix=/usr/local/erlang --without-javac
make
make install

编辑/etc/profile

export ERLANG_HOME=/usr/local/erlang
export PATH=$PATH:$ERLANG_HOME/bin
. /etc/profile
[root@node-1 ~]# erl
Erlang/OTP 21 [erts-10.3.4] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:1] [hipe]
Eshell V10.3.4  (abort with ^G)
1>

二进制包方式安装RabbitMQ

在三台机器上分别安装rabbitmq,下载安装地址:https://dl.bintray.com/rabbitmq/all/rabbitmq-server,这里下载的是rabbitmq-server-generic-unix-3.7.16.tar.xz

tar xvf rabbitmq-server-generic-unix-3.7.16.tar.xz
mv rabbitmq_server-3.7.16/ /usr/local/rabbitmq

修改/etc/profile

export RABBITMQ_HOME=/usr/local/rabbitmq
export PATH=$PATH:$RABBITMQ_HOME/sbin
. /etc/profile
ln -s /usr/local/rabbitmq/sbin/* /usr/local/sbin/
rabbitmq-server -detached       # 后台启动
rabbitmqctl stop                # 停止
rabbitmqctl status              # 查看状态
rabbitmqctl environment         # 查看有效的节点配置
rabbitmqctl node_health_check   # 对本地节点的运行状况检查

账号配置

安装启动后其实还不能在其它机器访问,rabbitmq 默认的 guest 账号只能在本地机器访问,如果想在其它机器访问必须配置其它账号:

# 创建管理员用户,负责整个MQ的运维
rabbitmqctl add_user admin pwd-for-admin
# 赋予其administrator角色
rabbitmqctl set_user_tags admin administrator
# 使用户admin具有/这个virtual host中所有资源的配置、写、读权限以便管理其中的资源
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
# 查看用户和角色
rabbitmqctl list_users
# 启动 rabbitmq 内置 web 插件, 管理 rabbitmq 账号等信息
rabbitmq-plugins enable rabbitmq_management

启动后可以使用:http://ip:15672访问web界面,以上三台机器同样的操作,安装启动之后此时每台机器运行着单机模式的RabbitMQ

配置 RabbitMQ 集群

Erlang Cookie

RabbitMQ 底层是通过 Erlang 架构来实现的,所以 rabbitmqctl 会启动 Erlang 节点,并基于 Erlang 节点来使用 Erlang 系统连接 RabbitMQ 节点,在连接过程中需要正确的 Erlang Cookie 和节点名称,Erlang 节点通过交换 Erlang Cookie 以获得认证。
Erlang Cookie 是保证不同节点可以相互通信的密钥,要保证集群中的不同节点相互通信必须共享相同的 Erlang Cookie。rpm安装存放在/var/lib/rabbitmq/.erlang.cookie,二进制安装存放在/root/.erlang.cookie,权限为 owner 只读。

普通集群创建

三台机器都安装好rabbitmq之后,把主节点的.erlang.cookie复制到各节点:
先在 node-2、node-3 备份原有的.erlang.cookie文件,然后先停止 rabbitmq 服务,如果不先停服 cookie 文件被修改后,后面操作无法正常停止。

cp /root/.erlang.cookie /root/.erlang.cookie.bak
rabbitmqctl stop

在 node-1 复制.erlang.cookie文件到各节点:

scp /root/.erlang.cookie root@node-2:/root/
scp /root/.erlang.cookie root@node-3:/root/

所有节点设置权限:

chmod 400 /root/.erlang.cookie

在 node-2、node-3 在启动 rabbitmq 服务:

rabbitmq-server -detached

在node-2、node-3节点执行下面命令,加入集群

rabbitmqctl stop_app    # 停止 rabbitmq 应用,没有停止 Erlang VM
rabbitmqctl reset       # 退出集群并恢复初始状态
# 默认是磁盘节点,这里需要已内存节点(--ram)需要加入集群
rabbitmqctl join_cluster --ram rabbit@node-1
rabbitmqctl start_app   # 开启 rabbitmq 应用

查看群集状态:

rabbitmqctl cluster_status

现在可以用过网页控制台查看集群状态:
图片[1]-消息队列 RabbitMQ 单机模式、集群、镜像集群+负载均衡-岸边IBIAN

镜像集群创建

使用 rabbitmq 作为消息服务时,在服务负载不是很大的情况下,一般我们只需要一个 rabbitmq 节点便能为我们提供服务,可这难免会发生单点故障,要解决这个问题,我们便需要配置 rabbitmq 的集群和镜像。
镜像集群是在普通集群的基础上执行一条设置镜像队列策略的命令即可,在任意一个节点上执行:

rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

参数解释:

  • ha-all为策略名称。
  • ^为匹配符,只有一个^代表匹配所有,^zlh为匹配名称为zlh的 exchanges 或者 queue。
  • ha-mode为匹配类型,分为 3 种模式:
      all所有(所有的queue)
      exctly部分(需配置ha-params参数,此参数为int类型比如3,众多集群中的随机3台机器)
      nodes指定(需配置ha-params参数,此参数为数组类型比如["rabbit@node-1","rabbit@node-2",,"rabbit@node-3"]这样指定为3台机器。)

查看策略:

rabbitmqctl list_policies

集群的常见运维操作

若某节点从集群退出,重置回独立节点,操作命令:

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app

若某节点需要停机维护,操作命令:

rabbitmqctl stop           # 先停止服务
rabbitmq-server -detached  # 维护完成后开启服务即可

若要修改某个节点类型,操作命令:

rabbitmqctl stop_app
rabbitmqctl change_cluster_node_type disc

若要从集群种移除某各节点(最好使用第一种退出集群方法):

rabbitmqctl -n rabbit@node-3 stop_app
rabbitmqctl forget_cluster_node rabbit@node-3

配置RabbitMQ的负载均衡

完成镜像队列设置之后,每各队列会被复制到各个节点,各个节点状态保持一致。因为 RabbitMQ 本身不提供负载均衡,需要搭建负载均衡器来提供负载转发,可以选择 HAProxy 和 Nginx。

HAProxy 负载方案

编译安装 PCRE

wget https://ftp.pcre.org/pub/pcre/pcre-8.42.tar.gz
tar zxvf pcre-8.42.tar.gz && cd pcre-8.42
./configure && make && make install

编译安装 HAProxy

# 下载后编译安装
wget https://www.haproxy.org/download/1.8/src/haproxy-1.8.20.tar.gz
tar zxvf haproxy-1.8.20.tar.gz && cd haproxy-1.8.20/
make TARGET=linux2628 ARCH=x86_64 PREFIX=/usr/local/haproxy USE_PCRE=1
make install PREFIX=/usr/local/haproxy
ln -s /usr/local/haproxy/sbin/haproxy /usr/sbin/
# 创建配置文件目录:
mkdir /etc/haproxy
# 创建用户:
useradd -s /sbin/nologin -M haproxy

HAProxy 配置文件/etc/haproxy/haproxy.cfg

global
    log      127.0.0.1   local0  notice
    log      127.0.0.1   local1  info
    pidfile  /var/run/haproxy.pid
    maxconn  4096
    user     haproxy
    group    haproxy
    daemon
defaults
    log      global
    option   dontlognull
    option   httpclose
    option   redispatch
    balance  roundrobin
    maxconn  4096
    timeout  connect 5s
    timeout  client 5s
    timeout  server 3s
    timeout  check 5s
    retries  3

# 统计页面配置
listen admin_stats
    bind     0.0.0.0:8100
    stats    enable
    mode     http
    log      global
    stats    uri  /stats
    stats    realm Haproxy Statistics
    stats    auth  admin:admin
    stats    admin if TRUE
    stats    refresh 20s

# RabbitMQ 管理页面
listen rabbitmq_admin
    bind     0.0.0.0:8010
    mode     http
    server   node-1 node-1:15672
    server   node-2 node-2:15672
    server   node-3 node-3:15672

# RabbitMQ 服务
listen rabbitmq_cluster
    bind     0.0.0.0:8020
    mode     tcp
    option   tcplog
    balance  roundrobin
    timeout  client 3h
    timeout  server 3h
    server   node-1 node-1:5672 check inter 5000 rise 2 fall 3
    server   node-2 node-2:5672 check inter 5000 rise 2 fall 3
    server   node-3 node-3:5672 check inter 5000 rise 2 fall 3

haproxy 常用命令:

# 检查配置文件语法
haproxy -c -f /etc/haproxy/haproxy.cfg
# 启动 haproxy
haproxy -f /etc/haproxy/haproxy.cfg
# restart
haproxy -f /etc/haproxy.cfg -st `cat /var/run/haproxy.pid`
# reload
haproxy -f /etc/haproxy.cfg -sf `cat /var/run/haproxy.pid`

Nginx 负载方案

nginx安装方法略过,版本需要大于1.9.0,且编译时配置--with-stream参数。
nginx配置文件:

worker_processes auto;
error_log  logs/nginx_error.log  crit;
pid        logs/nginx.pid;
worker_rlimit_nofile 51200;
events {
    use epoll;
    worker_connections 51200;
    multi_accept on;
}
stream {
    upstream rabbitmq {
        server 192.168.0.3:5672;
        server 192.168.0.4:5672;
        server 192.168.0.5:5672;
    }
    server {
        listen 5678;
        proxy_connect_timeout 1s;
        proxy_timeout 600s;
        proxy_pass rabbitmq;
    }
}

以上用 haproxy 或 nginx 实现了 rabbitmq 的负载均衡,如果要实现高可用还需要配置 Keepalived 实现

测试rabbitmq集群

安装pika模块:

pip install pika

创建生产者 producer.py 文件:

# -*- coding: utf-8 -*-
import pika
credentials = pika.PlainCredentials('admin', 'pwd-for-admin')
connection = pika.BlockingConnection(
    pika.ConnectionParameters('192.168.0.3', 5678, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='this is a test message.')
print("开始队列...")
connection.close()

创建消费者 consumer.py 文件:

# -*- coding: utf-8 -*-
import pika
credentials = pika.PlainCredentials('admin', 'pwd-for-admin')
connection = pika.BlockingConnection(
    pika.ConnectionParameters('192.168.0.3', 5678, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='hello')
# 回调函数,收到消息后回调
def callback(ch, method, properties, body):
    print("消费列队消息: %s" % body)
channel.basic_consume(queue='hello',
                      on_message_callback=callback,
                      auto_ack=True)
print('正在等待消息...')
channel.start_consuming()

查看rabbitmq列队和属性:

rabbitmqctl list_queues

RabbitMQ 的用户权限 VirtualHost 和角色概念补充

像 MySQL 有数据库的概念并且可以指定用户对库和表等操作的权限。那 RabbitMQ 呢?RabbitMQ 也有类似的权限管理。在 RabbitMQ 中可以虚拟消息服务器 VirtualHost,每个 VirtualHost 相当月一个相对独立的 RabbitMQ 服务器,每个 VirtualHost 之间是相互隔离的,exchange、queue、message 不能互通。
在 RabbitMQ 中无法通过 AMQP 创建 VirtualHost,可以通过rabbitmqctl add_vhost [vhostname]命令来创建。
通常在权限管理中主要包含三步:
1、新建用户

rabbitmqctl add_user superrd pwd-for-superrd

2、配置权限

set_permissions [-p ] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

其中".*"的位置分别用正则表达式来匹配特定的资源,如'^(amq.gen.*|amq.default)$'可以匹配 server 生成的和默认的 exchange,'^$'不匹配任何资源。"/"代表 virtual host 为"/",把这个"/"理解成字符串就行。
查看用户权限命令:

rabbitmqctl list_user_permissions admin
rabbitmqctl list_permissions -p /

3、配置角色

rabbitmqctl set_user_tags [user] [role]

RabbitMQ 中的角色分为如下五类:none、management、policymaker、monitoring、administrator
例如创建监控用户,负责整个MQ的监控:

rabbitmqctl add_user mtuser pwd-for-mtuser
rabbitmqctl set_user_tags mtuser monitoring

官方解释如下:

  • none 不能访问 management plugin
  • management 用户可以通过 AMQP 做的任何事外加:
    列出自己可以通过 AMQP 登入的 virtual hosts
    查看自己的 virtual hosts 中的 queues, exchanges 和 bindings
    查看和关闭自己的 channels 和 connections
    查看有关自己的 virtual hosts 的“全局”的统计信息,包含其他用户在这些 virtual hosts 中的活动。
  • policymaker management 可以做的任何事外加:
    查看、创建和删除自己的 virtual hosts 所属的 policies 和 parameters
  • monitoring management 可以做的任何事外加:
    列出所有 virtual hosts,包括他们不能登录的 virtual hosts
    查看其他用户的 connections 和 channels
    查看节点级别的数据如 clustering 和 memory 使用情况
    查看真正的关于所有 virtual hosts 的全局的统计信息
  • administrator policymaker 和 monitoring 可以做的任何事外加:
    创建和删除 virtual hosts
    查看、创建和删除 users
    查看创建和删除 permissions
    关闭其他用户的 connections
THE END
点赞0赞赏 分享
抢沙发
头像
提交
头像

昵称

取消
昵称表情

    暂无评论内容