Skip to content

activemq笔记

官网

https://activemq.apache.org/

端口

  • 61616
  • 8161

操作

sh
# 启动
start
# 停止
stop
# 查看端口占用
lsof -i:61616
netstat -app|grep 61616
# 查看进程
ps -ef | grep activemq | grep -v grep

基本步骤

图解

queue

生产者

java
//创建工厂
ConnectionFactory factory  = new ActiveMQConnectionFactory("tcp://192.168.111.128:61616");

//创建链接
Connection connection = factory.createConnection();

//启动链接
connection.start();

//创建session,第一个参数为是否开启事务,第二个参数为确认机制
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

//创建目标地址(Queue:点对点消息,Topic:发布订阅消息)注意,生产者和消费者这里得一样
Queue queue = session.createQueue("queue01");

//创建消息生产者
MessageProducer producer = session.createProducer(queue);

//创建消息
TextMessage textMessage = session.createTextMessage("test message");

//发送消息
producer.send(textMessage);

//释放资源
session.close();
connection.close();

消费者

java
//创建工厂
ConnectionFactory factory  = new ActiveMQConnectionFactory("tcp://192.168.111.128:61616");
//创建链接
Connection connection = factory.createConnection();
//启动链接
connection.start();
//创建session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建目标地址(Queue:点对点消息,Topic:发布订阅消息)注意,生产者和消费者这里得一样
Queue queue = session.createQueue("queue01");
//创建消息消费者
MessageConsumer consumer = session.createConsumer(queue);
//设置消息监听器来接收消息
consumer.setMessageListener(new MessageListener() {
    //处理消息
    @Override
    public void onMessage(Message message) {
        if(message instanceof TextMessage){
            TextMessage textMessage = (TextMessage)message
            try {
                System.out.println("接收的消息(2):"+textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
});
//注意:在监听器的模式下千万不要关闭连接,一旦关闭,消息无法接收

topic

生产者

java
//创建工厂
ConnectionFactory factory  = new ActiveMQConnectionFactory("tcp://192.168.111.128:61616");
//创建链接
Connection connection = factory.createConnection();
//启动链接
connection.start();
//创建session,第一个参数为是否开启事务,第二个参数为确认机制
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建目标地址(Queue:点对点消息,Topic:发布订阅消息)注意:生产者和消费者这里得一样
Topic topic = session.createTopic("topic01");
//创建消息生产者
MessageProducer producer = session.createProducer(topic);
//创建消息
TextMessage textMessage = session.createTextMessage("test message--topic");
//发送消息
producer.send(textMessage);
//释放资源
session.close();
connection.close();

消费者

java
//创建工厂
ConnectionFactory factory  = new ActiveMQConnectionFactory("tcp://192.168.111.128:61616");
//创建链接
Connection connection = factory.createConnection();
//启动链接
connection.start();
//创建session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建目标地址(Queue:点对点消息,Topic:发布订阅消息)注意,生产者和消费者这里得一样
Topic topic = session.createTopic("topic01");
//创建消息消费者
MessageConsumer consumer = session.createConsumer(topic);
//设置消息监听器来接收消息
consumer.setMessageListener(new MessageListener() {
    //处理消息
    @Override
    public void onMessage(Message message) {
        if(message instanceof TextMessage){
            TextMessage textMessage = (TextMessage)message
            try {
                System.out.println("接收的消息--topic:"+textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
});

注意:一定要先启动消费者,然后再启动生产者,否则消费者无法接收到消费者启动之前的消息

springboot整合

依赖

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

配置项

yml
server:
  port: 9001 #端口
spring:
  application:
    name: activemq-producer # 服务名称

# springboot与activemq整合配置
  activemq:
    broker-url: tcp://192.168.111.128:61616 # 连接地址
    user: admin # activemq用户名
    password: admin :# activemq密码
    packages:
      trust-all: true # 让ActiveMQ信任全部自定义对象,实现对象的序列化或反序列化

# 指定发送模式 (点对点 false , 发布订阅 true)
  jms:
    pub-sub-domain: false
    template:
      delivery-mode: persistent  # 持久化配置(消息存储在日志文件里面)

# 自己定义目标名称(队列或主题)
activemq:
  name: springboot_queue

生产者

配置文件

注意

ActiveMQ5.12后 ,为了安全考虑,ActiveMQ默认不接受自定义的序列化对象,需要将自定义的加入到受信任的列表。

测试类

建议

建议使用jmsTemplate来操作,当然用JmsMessagingTemplate也行

消费者

配置文件

注意

ActiveMQ5.12后 ,为了安全考虑,ActiveMQ默认不接受自定义的序列化对象,需要将自定义的加入到受信任的列表。

监听类

JMS消息体

TextMessage

一个字符串对象

生产者

消费者

MapMessage

一套名称-值对

生产者

消费者

ObjectMessage

一个序列化的 Java 对象

生产者

消费者

注意

ActiveMQ5.12后 ,为了安全考虑,ActiveMQ默认不接受自定义的序列化对象,需要将自定义的加入到受信任的列表。

BytesMessage

一个字节的数据流

生产者

消费者

StreamMessage

Java原始值的数据流

生产者

消费者

消息持久化

可以选的配置

上次用的写法

简单的业务

只要求接收和发送topic消息

消费者

用监听注解

配置

生产者

配置

麻烦的业务

需求:需要可以接收和发送queue消息和topic消息

配置文件

配置类

监听类

定时将发送失败的信息取出来发送

borker