rabbitmq笔记
工作原理

简单的用法
生产者创建工厂——>创建链接——>从链接中拿到channel——>用channel调用队列声明——>用channel调用basicPublish发送消息
端口
- 管理界面默认端口:15672
- 后端发消息的端口:5672
- 集群端口:25672
安装
依赖
erlang
rpm -ivh erlang-21.3-1.el7.x86_64.rpmsocat
yum install socat -y主体
rabbitmq-server
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm常用命令
添加开机启动 RabbitMQ 服务
chkconfig rabbitmq-server on启动服务
/sbin/service rabbitmq-server start查看服务状态
/sbin/service rabbitmq-server status停止服务(选择执行)
/sbin/service rabbitmq-server stop开启 web 管理插件(前端管理界面) 注意:需要先停止rabbitmq的服务
rabbitmq-plugins enable rabbitmq_management前端管理界面
创建账号看下面,默认账号只能在安装的机器上登录 默认用户只能通过locahost访问
添加一个新的用户
查看当前所有用户和角色
rabbitmqctl list_users创建账号
用户名为admin,密码为123
rabbitmqctl add_user admin 123设置用户角色
给admin这个用户赋予管理员权限
rabbitmqctl set_user_tags admin administrator设置用户权限
提示
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>使用
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"解释
用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限
也可以在前台管理界面添加(如果能登录的话)
基本使用
queue
图解
生产者
创建链接工厂
ConnectionFactory factory = new ConnectionFactory();设置工厂参数
factory.setHost("192.168.111.128");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认值 guest创建连接 Connection
Connection connection = factory.newConnection();创建Channel
Channel channel = connection.createChannel();声明队列
channel.queueDeclare("hello_world",true,false,false,null);声明消息
如果有现成的就不用这步了
String body = "hello rabbitmq~~~";发送消息
channel.basicPublish("","hello_world",null,body.getBytes());释放资源
channel.close();
connection.close();

消费者
创建链接工厂
ConnectionFactory factory = new ConnectionFactory();设置工厂参数
factory.setHost("192.168.111.128");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认值 guest创建连接 Connection
Connection connection = factory.newConnection();创建Channel
Channel channel = connection.createChannel();声明队列
channel.queueDeclare("hello_world",true,false,false,null);接收消息
创建消费者(需要实现handleDelivery方法)
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};接收
channel.basicConsume("hello_world",true,consumer);发布订阅模式
Fanout
图解
使用
通用部分(创建工厂——>设置参数——>创建链接——>创建channel)
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("192.168.111.128");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();生产者
创建交换机(交换机模式设为BuiltinExchangeType.FANOUT)
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.FANOUT,true,false,false,null);
创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");发送消息
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
channel.basicPublish(exchangeName,"",null,body.getBytes());释放资源
channel.close();
connection.close();消费者
创建消费者对象,实现handledelivery方法
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印到控制台.....");
}
};接收消息
- 声明接收的队列名字(也可以直接传值到下面的第一个参数)
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";- 接收消息
channel.basicConsume(queue1Name,true,consumer);Routing
图解
使用
优点
根据RoutingKey来匹配发送的消息和接收的消息(当交换机发送的消息的RoutingKey和队列接收的RoutingKey相同时,队列才能接收到消息,一个队列可以绑定多个RoutingKey)
生产者
通用部分
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("192.168.111.128");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();创建交换机 交换机模式设为BuiltinExchangeType.DIRECT
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2"; channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);绑定队列并指定key(key需要与消费者对应才能收到消息)
//队列1绑定 error
channel.queueBind(queue1Name,exchangeName,"error");
//队列2绑定 info error warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error...";发送消息
channel.basicPublish(exchangeName,"warning",null,body.getBytes());释放资源
channel.close();
connection.close();消费者
创捷消费者对象 实现消费的方法handleDelivery
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印到控制台.....");
}
};接收消息
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.basicConsume(queue2Name,true,consumer);Topic
图解
使用
优点
绑定队列和交换机的时候可以使用通配符绑定 通配符:
- *表示匹配一个
- #表示匹配所有
生产者
通用部分
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("192.168.111.128");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();创建交换机 声明交换机类型为BuiltinExchangeType.TOPIC
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);创建队列
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
// routing key 系统的名称.日志的级别。
//=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
//注意:这里是用通配符绑定!!!
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
channel.queueBind(queue2Name,exchangeName,"*.*");发送消息
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());释放资源
channel.close();
connection.close();消费者
创建消费者对象 实现handledelivery方法
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息存入数据库.......");
}
};接收消息
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.basicConsume(queue1Name,true,consumer);整合springboot
配置文件
spring:
rabbitmq:
host: 192.168.111.128 # ip
port: 5672
username: guest
password: guest
virtual-host: /生产者
配置类
发送消息
消费者
监听类
项目中使用
理解
[!发送] 发送 创建个配置类-->声明队列和交换机-->将队列绑定到交换机上-->绑定的时候要写好需要的routingkey(接收规则) 发消息的时候是发到交换机里,交换机根据routingkey分发给绑定好的队列,只有routingkey相匹配队列才能收到消息
监听
只用监听队列就行,监听器上写对应队列的名字即可

配置类
这样就可以接收到这些消息了
用docker安装rabbitmq
docker创建的rabbitmq踩坑
控制端不显示
在docker部署RabbitMq里里找到开启 web 管理插件(前端管理界面)的方法,然后用下面的方法进入镜像中执行
遇到控制端报500错误

解决方法
# 进入镜像
docker exec -it rabbitmq bash
# 进到配置目录
cd /etc/rabbitmq/conf.d/
#把前面的那个值写到后面这个配置文件中
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
#退出镜像bash
exit
# 重启镜像
docker restart rabbitmq端口被占用
将端口映射到其他端口 






发送消息 

这样就可以接收到这些消息了 