Skip to content

rabbitmq笔记

工作原理

简单的用法

生产者创建工厂——>创建链接——>从链接中拿到channel——>用channel调用队列声明——>用channel调用basicPublish发送消息

端口

  • 管理界面默认端口:15672
  • 后端发消息的端口:5672
  • 集群端口:25672

安装

依赖

erlang

sh
rpm -ivh erlang-21.3-1.el7.x86_64.rpm

socat

sh
yum install socat -y

主体

rabbitmq-server

sh
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

常用命令

添加开机启动 RabbitMQ 服务

sh
chkconfig rabbitmq-server on

启动服务

sh
/sbin/service rabbitmq-server start

查看服务状态

sh
/sbin/service rabbitmq-server status

停止服务(选择执行)

sh
/sbin/service rabbitmq-server stop

开启 web 管理插件(前端管理界面) 注意:需要先停止rabbitmq的服务

sh
rabbitmq-plugins enable rabbitmq_management

前端管理界面

创建账号看下面,默认账号只能在安装的机器上登录 默认用户只能通过locahost访问

添加一个新的用户

查看当前所有用户和角色

sh
rabbitmqctl list_users

创建账号

用户名为admin,密码为123

sh
rabbitmqctl add_user admin 123

设置用户角色

给admin这个用户赋予管理员权限

sh
rabbitmqctl set_user_tags admin administrator

设置用户权限

提示

sh
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>

使用

sh
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

解释

用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限

也可以在前台管理界面添加(如果能登录的话)

基本使用

queue

图解

生产者

创建链接工厂
java
ConnectionFactory factory = new ConnectionFactory();
设置工厂参数
java
factory.setHost("192.168.111.128");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/itcast");//虚拟机 默认值/
        factory.setUsername("heima");//用户名 默认 guest
        factory.setPassword("heima");//密码 默认值 guest
创建连接 Connection
java
Connection connection = factory.newConnection();
创建Channel
java
Channel channel = connection.createChannel();
声明队列
java
channel.queueDeclare("hello_world",true,false,false,null);
声明消息

如果有现成的就不用这步了

java
String body = "hello rabbitmq~~~";
发送消息
java
channel.basicPublish("","hello_world",null,body.getBytes());
释放资源
java
channel.close();
connection.close();

消费者

创建链接工厂
java
ConnectionFactory factory = new ConnectionFactory();
设置工厂参数
java
factory.setHost("192.168.111.128");//ip  默认值 localhost
    factory.setPort(5672); //端口  默认值 5672
    factory.setVirtualHost("/itcast");//虚拟机 默认值/
    factory.setUsername("heima");//用户名 默认 guest
    factory.setPassword("heima");//密码 默认值 guest
创建连接 Connection
java
Connection connection = factory.newConnection();
创建Channel
java
Channel channel = connection.createChannel();
声明队列
java
channel.queueDeclare("hello_world",true,false,false,null);
接收消息

创建消费者(需要实现handleDelivery方法)

java
Consumer consumer = new DefaultConsumer(channel){
    /*
        回调方法,当收到消息后,会自动执行该方法
        1. consumerTag:标识
        2. envelope:获取一些信息,交换机,路由key...
        3. properties:配置信息
        4. body:数据
     */
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("consumerTag:"+consumerTag);
        System.out.println("Exchange:"+envelope.getExchange());
        System.out.println("RoutingKey:"+envelope.getRoutingKey());
        System.out.println("properties:"+properties);
        System.out.println("body:"+new String(body));
    }
};

接收

java
channel.basicConsume("hello_world",true,consumer);

发布订阅模式

Fanout

图解

使用

通用部分(创建工厂——>设置参数——>创建链接——>创建channel)

java
//1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("192.168.111.128");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/itcast");//虚拟机 默认值/
        factory.setUsername("heima");//用户名 默认 guest
        factory.setPassword("heima");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
生产者

创建交换机(交换机模式设为BuiltinExchangeType.FANOUT)

java
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.FANOUT,true,false,false,null);

创建队列

java
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);

绑定队列和交换机

java
/*
queueBind(String queue, String exchange, String routingKey)
参数:
    1. queue:队列名称
    2. exchange:交换机名称
    3. routingKey:路由键,绑定规则
        如果交换机的类型为fanout ,routingKey设置为""
 */
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");

发送消息

java
String body = "日志信息:张三调用了findAll方法...日志级别:info...";

 channel.basicPublish(exchangeName,"",null,body.getBytes());

释放资源

java
channel.close();
connection.close();
消费者

创建消费者对象,实现handledelivery方法

java
Consumer consumer = new DefaultConsumer(channel){
    /*
        回调方法,当收到消息后,会自动执行该方
        1. consumerTag:标识
        2. envelope:获取一些信息,交换机,路由key...
        3. properties:配置信息
        4. body:数
     */
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
      /*  System.out.println("consumerTag:"+consumerTag);
        System.out.println("Exchange:"+envelope.getExchange());
        System.out.println("RoutingKey:"+envelope.getRoutingKey());
        System.out.println("properties:"+properties);*/
        System.out.println("body:"+new String(body));
        System.out.println("将日志信息打印到控制台.....");
    }
};

接收消息

  • 声明接收的队列名字(也可以直接传值到下面的第一个参数)
java
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
  • 接收消息
java
channel.basicConsume(queue1Name,true,consumer);

Routing

图解

使用

优点

根据RoutingKey来匹配发送的消息和接收的消息(当交换机发送的消息的RoutingKey和队列接收的RoutingKey相同时,队列才能接收到消息,一个队列可以绑定多个RoutingKey)

生产者

通用部分

java
//1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("192.168.111.128");//ip  默认值 localhost
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/itcast");//虚拟机 默认值/
        factory.setUsername("heima");//用户名 默认 guest
        factory.setPassword("heima");//密码 默认值 guest
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();

创建交换机 交换机模式设为BuiltinExchangeType.DIRECT

java
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);

创建队列

java
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2"; channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);

绑定队列并指定key(key需要与消费者对应才能收到消息)

java
//队列1绑定 error
    channel.queueBind(queue1Name,exchangeName,"error");
    //队列2绑定 info  error  warning
    channel.queueBind(queue2Name,exchangeName,"info");
    channel.queueBind(queue2Name,exchangeName,"error");
    channel.queueBind(queue2Name,exchangeName,"warning");
    String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error...";

发送消息

java
channel.basicPublish(exchangeName,"warning",null,body.getBytes());

释放资源

java
channel.close();
connection.close();
消费者

创捷消费者对象 实现消费的方法handleDelivery

java
Consumer consumer = new DefaultConsumer(channel){
    /*
        回调方法,当收到消息后,会自动执行该方
        1. consumerTag:标识
        2. envelope:获取一些信息,交换机,路由key...
        3. properties:配置信息
        4. body:数
     */
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
      /*  System.out.println("consumerTag:"+consumerTag);
        System.out.println("Exchange:"+envelope.getExchange());
        System.out.println("RoutingKey:"+envelope.getRoutingKey());
        System.out.println("properties:"+properties);*/
        System.out.println("body:"+new String(body));
        System.out.println("将日志信息打印到控制台.....");
    }
};

接收消息

java
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";

channel.basicConsume(queue2Name,true,consumer);

Topic

图解

使用

优点

绑定队列和交换机的时候可以使用通配符绑定 通配符:

  • *表示匹配一个
  • #表示匹配所有
生产者

通用部分

java
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("192.168.111.128");//ip  默认值 localhost
factory.setPort(5672); //端口  默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();

创建交换机 声明交换机类型为BuiltinExchangeType.TOPIC

java
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);

创建队列

java
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);

绑定队列和交换机

java
/*
queueBind(String queue, String exchange, String routingKey)
参数:
    1. queue:队列名称
    2. exchange:交换机名称
    3. routingKey:路由键,绑定规则
        如果交换机的类型为fanout ,routingKey设置为""
 */
 // routing key  系统的名称.日志的级别。
//=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
//注意:这里是用通配符绑定!!!
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
channel.queueBind(queue2Name,exchangeName,"*.*");

发送消息

java
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());

释放资源

java
channel.close();
connection.close();
消费者

创建消费者对象 实现handledelivery方法

java
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
    /*
        回调方法,当收到消息后,会自动执行该方
        1. consumerTag:标识
        2. envelope:获取一些信息,交换机,路由key...
        3. properties:配置信息
        4. body:数
     */
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
      /*  System.out.println("consumerTag:"+consumerTag);
        System.out.println("Exchange:"+envelope.getExchange());
        System.out.println("RoutingKey:"+envelope.getRoutingKey());
        System.out.println("properties:"+properties);*/
        System.out.println("body:"+new String(body));
        System.out.println("将日志信息存入数据库.......");
    }
};

接收消息

java
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";

channel.basicConsume(queue1Name,true,consumer);

整合springboot

配置文件

yml
spring:
  rabbitmq:
    host: 192.168.111.128 # ip
    port: 5672
    username: guest
    password: guest
    virtual-host: /

生产者

配置类 发送消息

消费者

监听类

项目中使用

理解

[!发送] 发送 创建个配置类-->声明队列和交换机-->将队列绑定到交换机上-->绑定的时候要写好需要的routingkey(接收规则) 发消息的时候是发到交换机里,交换机根据routingkey分发给绑定好的队列,只有routingkey相匹配队列才能收到消息

监听

只用监听队列就行,监听器上写对应队列的名字即可

配置类 这样就可以接收到这些消息了

用docker安装rabbitmq

Docker部署rabbitmq

docker创建的rabbitmq踩坑

控制端不显示

docker部署RabbitMq里里找到开启 web 管理插件(前端管理界面)的方法,然后用下面的方法进入镜像中执行

遇到控制端报500错误

解决方法

sh
# 进入镜像
docker exec -it rabbitmq bash
# 进到配置目录
cd /etc/rabbitmq/conf.d/
#把前面的那个值写到后面这个配置文件中
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
#退出镜像bash
exit
# 重启镜像
docker restart rabbitmq

端口被占用

将端口映射到其他端口