- RabbitMQ 入门
- 1RabbitMQ 介绍
RabbitMQ 是信息传输的中间者。本质上,他从生产者(producers)接收消息,转发这些消息给消费者(consumers).换句话说,他能够按根据你指定的规则进行消息转发、缓冲、和持久化。
RabbitMQ 的一些常见的术语:
Producing意味着无非是发送。一个发送消息的程序是一个producer(生产者)。一般用下图表示Producer:
Queue(队列)类似邮箱。依存于RabbitMQ内部。虽然消息通过RabbitMQ在你的应用中传递,但是它们只能存储在queue中。队列不受任何限制,可以存储任何数量的消息—本质上是一个无限制的缓存。很多producers可以通过同一个队列发送消息,相同的很多consumers可以从同一个队列上接收消息。一般用下图表示队列:
Consuming(消费)类似于接收。consumer是基本属于等待接收消息的程序。一般使用下图表示Consumer:
注意:producer(生产者),consumer(消费者),broker(RabbitMQ服务)并不需要部署在同一台机器上,实际上在大多数实际的应用中,也不会部署在同一台机器上。
- 2RabbitMQ工作原理
P——代表消息生产者,一个向交换器发布消息的客户端应用程序
C——代表消息消费者,一个从消息队列中请求消息的客户端应用程序。
X——代表路由交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
红色——消息队列,用来保存消息直到发送给消费者。
客户端与RabbitMQ交互过程:
- 1获取Conection
- 2创建Channel
- 3定义Exchange,Queue
- 4使用一个RoutingKey将Queue Binding到一个Exchange上
- 5通过指定一个Exchange和一个RoutingKey来将消息发送到对应的Queue上,
- 6 接收方在接收时也是获取connection,接着获取channel,然后指定一个Queue直接到它关心的Queue上取消息,它对Exchange,RoutingKey及如何binding都不关心,到对应的Queue上去取消息就OK了
- 3RabbitMQ安装
- 下载Erlang,地址:http://www.erlang.org/download/otp_win32_R15B.exe ,双击安装即可(首先装)
- 下载RabbitMQ,地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.3.4/rabbitmq-server-3.3.4.exe ,双击安装即可
- 下载rabbit-client.jar ,Java代码时需要导入。地址:http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.3.4/rabbitmq-java-client-bin-3.3.4.zip
- 安装完成后,在RabbitMQ的安装目录的sbin先会有:rabbitmq-server.bat
例如:
在cmd下:进入sbin目录,运行rabbitmq-server start
安装完rabbitMQ可以再启动插件扩展,其中包含了一个管理后台
最新版本的后台地址为 http://localhost:15672/
用户名和密码都为guest,输入完成进入主菜单
功能很丰富,可以查看当前服务器的交换机,队列,消息,连接,会话等得使用情况。
基本上到这里服务器的安装部署环节算是ok,很简单。
- 4AMQP协议
客户端与RabbitMQ连接时,首先要建立和RabbitMQ代理之间的tcp连接。一旦tcp打开,你的应用会创建一个 AMQP channel。这个channel是在“真实”的tcp连接里面的一个“虚拟”的连接,你可以通过channel发送amqp命令。每个channel都有个唯一的ID.如果通过tcp发送amqp命令,则会消耗大量的tcp连接(tcp连接是有限的)。当创建一个线程的时候,会在tcp连接上创建一个channel,这个线程拥有私有的与rabbit沟通的路径,并且不会在系统的tcp栈上添加额外的负荷。一个tcp连接上面可以建立的amqp channel数量是没有限制的。可以把他想象成一捆光纤电缆:
-
- RabbitMQ的消息确认机制
- RabbitMQ将消息投递到客户端后,客户端如果没处理完这个消息就死掉了,这个消息还会不会存在?这取决于RabbitMQ的消息确认机制(Message acknowledgment)是否打开。
- 为了确保消息不会丢失,RabbitMQ支持消息确认机制。客户端在接受到消息并处理完后,可以发送一个ack消息给RabbitMQ,告诉它该消息可以安全的删除了。假如客户端在发送ack之前意外死掉了,那么RabbitMQ会将消息投递到下一个consumer客户端。如果有多个consumer客户端,RabbitMQ在投递消息时是轮询的。
- RabbitMQ如何判断客户端死掉了?唯一根据是客户端连接是否断开。这里没有超时机制,也就是说客户端可以处理一个消息很长时间,只要没断开连接,RabbitMQ就一直等待ack消息。消息确认机制默认是打开的,除非你设置no_ack=True标记来手工关闭它。
-
- RabbitMQ消息传送 的三种模式
- Fanout模式:Exchange会将信息发送到所有与它绑定在一起的队列。
- Direct模式:如果routing key匹配,就将消息发送到对应的队列。
- Topic模式:通过使用匹配符,可以实现将不同来源的信息发送到同一个队列上。
-
Java 入门实例
一个producer发送消息,一个接收者接收消息,并在控制台打印出来。如下图:
其中P代表生产者、C表示消费者、中间红色部分代表消息队列
注:需要在官网下载rabbitmq-java-client-bin-*.zip将jar放入项目的classpath
发送端:Send.java 连接到RabbitMQ(此时服务需要启动),发送一条数据,然后退出。
生产者客户端的发送消息程序如下:
Java代码 编辑
package com.abin.test;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
//队列名称
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException {
/**
* 创建连接连接到rabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ所在主机ip或者主机名
factory.setHost("localhost");
//创建一个连接
Connection connection = factory.newConnection();
//创建一个频道
Channel channel = connection.createChannel();
//指定一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送的消息
String message = "Hello World!";
//往队列中发出一条消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭频道和连接
channel.close();
connection.close();
}
}
运行结果如下:
[x] Sent 'Hello World!'
值得注意的是队列只会在它不存在的时候创建,多次声明并不会重复创建。信息的内容是字节数组,也就意味着你可以传递任何数据。
接收端:Recv.java 不断等待服务器推送消息,然后在控制台输出。
消费者客户端接收消息程序如下:
Java代码
package com.abin.test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Reqv {
//队列名称
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
//打开连接和创建频道,与发送端一样
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//创建队列消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定消费队列
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
//nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
运行程序得到的结果如下:
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'
- RabbitMQ进阶
- Linux 下RabbitMQ软件安装
- 安装前的数据准备(上官网下载即可):
(1)erlang-17.4-1.el6.x86_64.rpm (需要依赖openssl-1.0.1,需确保机器已安装)
(2)xmlto-0.0.26.tar.gz
(3)rabbitmq-server-3.5.4.tar.gz
- 安装前的环境准备
(1)机器(示例搭建主从式,故需两台机器,且需要同网段)
机器名 |
IP地址 |
系统 |
版本 |
hostA |
99.12.73.239 |
Linux |
Red Hat Enterprise Linux Server release 6.4 (Santiago) |
hostB |
99.12.73.240 |
Linux |
Red Hat Enterprise Linux Server release 6.4 (Santiago) |
(2)设置hosts
①连接hostA,输入如下命令进行设置
# view /etc/sysconfig/network (查看系统主机名称HOSTNAME)
# echo $HOSTNAME (显示主机名,应与上一项一致)
# vi /etc/hosts (将另一台机器的ip/hostname添加到本机的hosts文
件中,用于DNS通讯解析)
- 进入hosts后,按shift+G,可以直接跳至最后一行
- 然后按o,在最后插入一行
- 输入另一台机器的ip/hostname
- 添加完毕后,按esc,输入":x",回车进行保存
②连接hostB,按①进行设置
软件安装
(1)安装erlang
①将erlang-17.4-1.el6.x86_64.rpm下载好,放到某路径下(本示例放在/usr/)
②输入以下命令进行安装
# cd /usr/
# rpm -ivh erlang-17.4-1.el6.x86_64.rpm
(2)安装xmlto(rabbitmq编译安装时需要使用)
①将xmlto-0.0.26.tar.gz下载好,放到某路径下(本示例放在/usr/)
②输入以下命令进行安装
#cd /usr/
#tar zxvf xmlto-0.0.26.tar.gz
#cd xmlto-0.0.26
#./configure
#make install
(3)安装rabbitmq
①将rabbitmq-server-3.5.4.tar.gz下载好,放到某路径下(本示例放在/usr/)
②输入以下命令进行安装
#tar -zxf rabbitmq-server-3.5.4.tar.gz
#cd rabbitmq-server-3.5.4
#make TARGET_DIR=/usr/local/rabbitmq SBIN_DIR=/usr/local/rabbitmq/sbin MAN_DIR=/usr/local/rabbitmq/man install
- RabbitMQ集群部署
- 添加用户组及用户(hostA\hostB都需要)
#cd ~
#groupadd -g 10001 rabbitmq
( groupmod -g 10001 rabbitmq 若rabbitmq已存在,修改rabbitmq的gid为10001)
( grep rabbitmq /etc/group 查看rabbimq的gid)
#useradd -u 10001 -g 10001 rabbitmq
#mkdir -pv /usr/local/rabbitmq/logs
#mkdir -pv /usr/local/rabbitmq/db
#chown rabbitmq:rabbitmq /usr/local/rabbitmq/logs
#chown rabbitmq:rabbitmq /usr/local/rabbitmq/db
-
-
构建erlang集群环境
-
(Rabbitmq的集群是依赖于erlang的集群来工作的,所以必须先构建起erlang的集群环境。Erlang的集群中各节点是通过一个magic cookie来实现的,这个cookie存放在$HOME/.erlang.cookie 中,文件是400的权限。所以必须保证各节点cookie保持一致,否则节点之间就无法通信。把相同的cookie存放于$HOME/.erlang.cookie文件中。这个文件包含了一个随机字符串,这个字符串是你第一次在该机器上运行Erlang时自动生成的。这个可以先通过./rabbitmq-server启动一个节点,这样在$HOME目录下就会产生这样一个.erlang.cookie文件了。如果你想把一台机器加入到当前的分布式Erlang网络中,就需要把这个文件复制到这台机器上。或者,也可以显式地设置这个值。)
(1)进入主节点机器
# cd ~
# chmod 700 $HOME/.erlang.cookie (修改文件的权限,进行读取)
# more .erlang.cookie
MYWQBZRLFLXNBFRSIAKP (复制那一串字符)
# chmod 400 $HOME/.erlang.cookie (恢复文件原来的权限,不允许读写)
(2)进入从节点机器
# cd ~
# chmod 700 $HOME/.erlang.cookie (修改文件的权限,进行读写)
# cat > .erlang.cookie (修改cookie)
MYWQBZRLFLXNBFRSIAKP (将第(1)步中复制的字符串粘贴进来)
^Z (按ctrl+z完成修改)
# more .erlang.cookie (检查cookie是否正确修改成一致)
# chmod 400 $HOME/.erlang.cookie (恢复文件原来的权限,不允许读写)
-
-
- 启动rabbitmq
-
#cd /usr/local/rabbitmq/sbin
#./rabbitmq-server start
-
-
- 构建rabbitmq集群环境
-
有了erlang的集群环境,rabbitmq集群环境构建起来很简单,只需要把从节点添加到主节点中即可:
(在从节点hostB中进行操作)
# cd /usr/local/rabbitmq/sbin
# ./rabbitmqctl stop_app
# ./rabbitmqctl join_cluster rabbit@hostA (hostA为主节点的主机名称, 执行此步骤之
前确保主节点服务器是开启的)
# ./rabbitmqctl start_app
# ./rabbitmqctl cluster_status (查看集群状态,若主从节点的状态一致,则成功)
(注:template即为hostA,oracle即为hostB)
-
- exchange属性
- type
前一章我们说了exchange的类型分为fanout,direct,topic.还有一种不常用的headers。
headers这种类型的exchange绑定的时候会忽略掉routingkey,Headers是一个键值对,可以定义成成字典等。发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。之前的几种exchange的routingKey都需要要字符串形式的,而headers exchange则没有这个要求,因为键值对的值可以是任何类型
举个例子,发送端定义2个键值{k1,1},{k2,2},接收端绑定队列的时候定义{"x-match", "any"},那么接收端的键值属性里只要存在{k1,1}或{k2,2}都可以获取到消息。
这样的类型扩展的程度很大,适合非常复杂的业务场景
- Durability
持久性,这是exchange的可选属性,如果你Durability设置为false,那些当前会话结束的时候,该exchange也会被销毁。
新建一个transient exchange
关闭当前连接再查看一下
刚才我们新建的transient已经销毁了。
- Auto delete
当没有队列或者其他exchange绑定到此exchange的时候,该exchange被销毁。这个很简单就不示例了。
- Internal
表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定。
注意: 无法声明2个名称相同 但是类型却不同的exchange
-
- Queue 属性
- Durability 和exchange相同,未持久化的队列,服务重启后销毁。
- Auto delete 当没有消费者连接到该队列的时候,队列自动销毁。
- Exclusive 使队列成为私有队列,只有当前应用程序可用,当你需要限制队列只有一个消费者,这是很有用的。
扩展属性如下对应源程序 RabbitMQ.Client.IModel.QueueDeclare(string, bool, bool, bool, System.Collections.Generic.IDictionary<string,object>)最后的参数
- Message TTL 当一个消息被推送在该队列的时候 可以存在的时间 单位为ms,(对应扩展参数argument "x-message-ttl" )
- Auto expire 在队列自动删除之前可以保留多长时间(对应扩展参数argument "x-expires")
- Max length 一个队列可以容纳的已准备消息的数量(对应扩展参数argument "x-max-length")
注意:一旦创建了队列和交换机,就不能修改其标志了。例如,如果创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。
-
- Message属性
- Durability
消息的持久在代码中设置的方法与exchange和queue不同,有2种方法
1.
1 2 3 4 |
IBasicProperties properties = channel.CreateBasicProperties(); properties.SetPersistent(true); byte[] payload = Encoding.ASCII.GetBytes(message); channel.BasicPublish(exchange.name, txtMessageRoutingKey.Text.Trim(), properties, payload); |
2.
1 2 3 4 |
IBasicProperties properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; byte[] payload = Encoding.ASCII.GetBytes(message); channel.BasicPublish(exchange.name, txtMessageRoutingKey.Text.Trim(), properties, payload); |
- contentType: 标识消息内容的MIME,例如JSON用application/json
- replayTo: 标识回调的queue的地址
- correlationId:用于request和response的关联,确保消息的请求和响应的同一性
- Message的2种状态:
- Ready
此状态的消息存在于队列中待处理。
- Unacknowledged
此状态的消息表示已经在处理未确认。
说到Unacknowledged,这里需要了解一个ack的概念。当Consumer接收到消息、处理任务完成之后,会发送带有这个消息标示符的ack,来告诉server这个消息接收到并处理完成。RabbitMQ会一直等到处理某个消息的Consumer的链接失去之后,才确定这个消息没有正确处理,从而RabbitMQ重发这个消息。
Message acknowledgment是默认关闭的。初始化Consumer时有个noAck参数,如果设置为true,这个Consumer在收到消息之后会马上返回ack。
string BasicConsume(string queue, bool noAck, RabbitMQ.Client.IBasicConsumer consumer)
一般来说,常用的场景noack一般就是设置成true,但是对于风险要求比较高的项目,例如支付。对于每一条消息我们都需要保证他的完整性和正确性。就需要获取消息后确认执行完正确的业务逻辑后再主动返回一个ack给server。可以通过rabbitmqctl list_queues name message_rady message_unacknowleded 命令来查看队列中的消息情况,也可以通过后台管理界面。
-
- binding 相关
如果你绑定了一个durable的队列和一个durable的交换机,RabbitMQ会自动保留这个绑定。类似的,如果删除了某个队列或交换机(无论是不是 durable),依赖它的绑定都会自动删除。
在声明一个队列的同时,server会默认让此队列绑定在默认的exchange上,这个exchange的名称为空。
- RabbitMQ实战
- RabbitMQ性能测试
- 发送脚本directSend
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.File;
import java.util.Date;
import java.text.SimpleDateFormat;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import lrapi.lr;
public class Actions
{
private static ConnectionFactory factory;// 初始化RabbitMQ消息参数
private static Connection conn ; // 连接rabbitMQ 初始化
private static Channel ch ; // rabbitMQ 频道初始化
public int init() throws Throwable {
try {
/**
* 创建连接连接到rabbitMQ
*/
factory = new ConnectionFactory();
//设置URI,URI会由项目组提供
factory.setUri("amqp://zwk:kangkang@99.12.73.239:5672/test");
//设置自动重连功能
factory.setAutomaticRecoveryEnabled(true);
conn = factory.newConnection(); //创建一个连接
ch = conn.createChannel(); //创建一个频道
ch.exchangeDeclare("direct_logs", "direct"); // 指定一个转发器
} catch (Exception e ) {
e.printStackTrace();
return 0;
}
return 0;
}//end of init
public static void sleep(int ms) { //模拟发送消息停顿函数,可以精确到毫秒
try {
Thread.sleep(ms);
} catch (InterruptedException ie) {
// no special processing required
}
}
/*
打印发送消息方法,为了定位是否存在丢消息
而写的方法
*/
private static void print2File (String msg){
try {
String dir=Actions.class.getClassLoader().getResource("").getPath();
String logFileName=new SimpleDateFormat("yyyy-MM-dd").format(new Date());
File file=new File(dir,logFileName+".txt");
FileOutputStream fos=new FileOutputStream(file,true);
fos.write((msg+"\r\n").getBytes());
fos.flush();
fos.close();
} catch ( FileNotFoundException e ) {
e.printStackTrace();
}catch(IOException e){
e.printStackTrace();
}
}
public int action() throws Throwable {
try {
ch.confirmSelect();
/*
模拟发送报文的大小,执行需要更改byte 数组大小,就可以模拟
发送任何字节的消息
*/
//---------------------message 组装-------------------------------
byte[] message = new byte[1000];
ByteArrayOutputStream acc = new ByteArrayOutputStream();
byte[] message0 = acc.toByteArray();
System.arraycopy(message0, 0, message, 0, message0.length);
//---------------------message 组装-------------------------------
/*
模拟消息发送,可以定制发送是否落地消息发送
更改
*/
for (long i = 0; i < 3000; ++i) {
lr.start_transaction("direct_send");
sleep(200);
//ch.basicPublish("<exchange_name>", "", MessageProperties.PERSISTENT_BASIC, message); //消息落地
ch.basicPublish("direct_logs", "test", MessageProperties.BASIC, message); //消息不落地
lr.end_transaction("direct_send",lr.AUTO);
//print2File(message+i); //记录发送消息内容
};
ch.waitForConfirmsOrDie();
}
catch (Exception e) {
e.printStackTrace();
return 0;
}
return 0;
}//end of action
public int end() throws Throwable {
ch.close(); //关闭通道
conn.close(); //关闭连接
return 0;
}//end of end
-
-
- 接收脚本:directReceive
- RabbitMQ 性能监控工具
-
(1)启用网页管理页面:
# cd /usr/local/rabbitmq/sbin/
# rabbitmq-plugins enable rabbitmq_management
(2)创建管理页面登录用户:
# cd /etc/rabbitmq/rabbitmq.config (如果没有,则自行创建,用vi rabbitmq.config)
文件congfig的内容:
-----------------------------------
[
{rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["zwk"]}]}
].
-----------------------------------
# cd /usr/local/rabbitmq/sbin/
# ./rabbitmqctl add_user zwk kangkang (zwk为用户名,kangkang为密码,下同)
# ./rabbitmqctl set_user_tags kang administrator (用户设置为administrator才能远程访问)
#./rabbitmqctl set_permissions -p test zwk ".*" ".*" ".*"
(3)在浏览器中输入99.12.73.239:15672(前提是启动了rabbitMQ服务)
(4)使用第(2)步中创建的用户进行登录,就可以查看具体信息
(5)切换到Queues页签,可以查看到目前队列的情况。另外,还可以自行新增队列(一般rabbitMQ的发送端指定队列时,若该队列不存在,会自动添加)。
(6)切换到exchange页签,可以查看到目前exchange的情况。另外,还可以自行新增exchange
(7)切换到Connection页签,可以查看到目前Connections的连接情况。
(8)切换到Admin页签,可以在此页签下进行设置用户、设置测试策略、设置virtual hosts
的操作。
-
- RabbitMQ后台管理
rabbitmqctl是一个简单的命令行工具用于管理RabbitMQ Server,常用项如下(需先进入/usr/local/rabbitmq/sbin/路径下,再执行下述命令):
(1)启停Server/应用
rabbitmqctl start
rabbitmqctl stoprabbitmqctl start_app
rabbitmqctl stop_app
(2)查看状态
rabbitmqctl status
(3)信息查询
rabbitmqctl list_queues
rabbitmqctl list_exchanges
rabbitmqctl list_bindings
(4)用户管理
用户管理包括增加用户,删除用户,查看用户列表,修改用户密码。
①新增一个用户
rabbitmqctl add_user Username Password
②删除一个用户
rabbitmqctl delete_user Username
③修改用户的密码
rabbitmqctl change_password Username Newpassword
④查看当前用户列表
rabbitmqctl list_users
(5)用户角色
用户角色可分为五类,超级管理员, 监控者, 策略制定者, 普通管理者以及其他。
- 超级管理员(administrator):
可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。
- 监控者(monitoring):
可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
- 策略制定者(policymaker):
可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息。
- 普通管理者(management):
仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。
- 其他(Others):无法登陆管理控制台,通常就是普通的生产者和消费者。
①设置用户角色的命令为:
rabbitmqctl set_user_tags User Tag
User为用户名, Tag为角色名(对应于上面的administrator,monitoring,policymaker,management,或其他自定义名称)。
②也可以给同一用户设置多个角色,例如
rabbitmqctl set_user_tags hncscwc monitoring policymaker
(6)用户权限
用户权限指的是用户对exchange,queue的操作权限,包括配置权限,读写权限。配置权限会影响到exchange,queue的声明和删除。读写权限影响到从queue里取消息,向exchange发送消息以及queue和exchange的绑定(bind)操作。
例如: 将queue绑定到某exchange上,需要具有queue的可写权限,以及exchange的可读权限;向exchange发送消息需要具有exchange的可写权限;从queue里取数据需要具有queue的可读权限。详细请参考官方文档中"How permissions work"部分。
相关命令为:
①设置用户权限
rabbitmqctl set_permissions -p VHostPath User ConfP WriteP ReadP
②查看(指定hostpath)所有用户的权限信息
rabbitmqctl list_permissions [-p VHostPath]
③查看指定用户的权限信息
rabbitmqctl list_user_permissions User
④清除用户的权限信息
rabbitmqctl clear_permissions [-p VHostPath] User
- 问题与解决
1、群过程中碰到如下错误:
#rabbitmqctl join_cluster rabbit@hostB
Clustering node rabbit@hostA with rabbit@hostB ...
Error: unable to connect to nodes [rabbit@hostB]: nodedown
DIAGNOSTICS
===========
attempted to contact: [rabbit@hostB]
rabbit@hostB:
* unable to connect to epmd (port 4369) on hostB: nxdomain (non-existing domain)
current node details:
- node name: 'rabbitmqctl-8666@hostA'
- home dir: /var/lib/rabbitmq
- cookie hash: 50YO3zK+HJHos0tab1vHjg==
解决方法:
集群节点间需能互相访问,故每个集群节点的hosts文件应包含集群内所有节点的信息以保证互相解析
#vi /etc/hosts
99.12.73.239 hostA
99.12.73.240 hostB
之后重启各节点中的rabbitmq
2、建集群过程中碰到如下错误:
解决之道:
集群节点间cookie需要一致,各节点间才能进行通讯。参照第三章第二点,设置好cookie。
3、Web管理界面无法正确显示
cluster搭建起来后若在web管理工具中rabbitmq_management的Overview的Nodes部分看到“Node statistics not available”的信息,说明在该节点上web管理插件还未启用。
直接在显示提示信息的节点上运行sudo rabbitmq-plugins enable rabbitmq_management即可。
Error: mnesia_unexpectedly_running
原因:忘记先停止stop_app
解决:
#rabbitmqctl stop_app
#rabbitmq-plugins enable rabbitmq_management
4、rabbitmq-server第一次启动后hostname不能被解析或者发生了更改则会导致启动失败
需执行如下操作
#m -rf /var/lib/rabbitmq/mnesia(因为相关信息会记录在此数据库)
重装RabbitMQ Server
5、发现多并发下,同时将收发消息,一段时间后,收发程序的TPS 下降得比较厉害,而 接收脚本的 处理能力会逐步降为零,而发送程序继续发送消息
问题分析:
- 以线程方式运行1,检查发现10并发压测时,只有一个连接时活动,其它9个连接都是空闲状态。
- 以进程方式运行时,检查发现10个并发压测时,10个连接都是活动状态
解决方案:
- 修改脚本,把通过Action 迭代收发消息,改成通过放在Action 中使用循环函数,收发消息。
- 运行过程中使用进程的方式运行。
6、PaaS平台下 网络与LR 负载机之前网络不稳定且存在3ms 的网络延迟
问题分析:
- 通过ping 命令 ping 99.12.104.86 服务器,查看网络延迟
- 而本身rabbiMQ发送一个消息响应时间都在1ms 以内,3ms 的网络延迟对测试RabbitMQ的消息速率的结果影响甚大。
解决方案:
- 新增一台负载机(99.12.188.158 )。
- Ping 99.12.104.86 机器,查看延迟都在1ms 以内
7、在高可用集群模式下,采用rabbitMQ java client提供的自动重连方法,在Consumer 中设置为消息接收确认机制的情况下,应用端有无法收到确认消息的
建议:不要采用RabbitMQ java client提供的自动重连方法,通过APP端实现自动重连的方法