ActiveMQ - 基础篇

网友投稿 301 2022-11-19

ActiveMQ - 基础篇

一、前言

二、概念

中间件:

非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件。

消息中间件:

关注与数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统。

三、规范

四、ActiveMQ

4.1、下载安装(Win)

Ps1:直接启动:找到 activemq.bat 启动(推荐管理员方式运行)。

Ps2:使用服务启动:找到 InstallService.bat 启动(推荐管理员方式运行)。

4.2、队列模式

AppProducer 类

package com.myimooc.jms.queue;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/** * App 生产者-队列模式 */public class AppProducer { // 指定ActiveMQ服务的地址 private static final String URL = "tcp://127.0.0.1:61616"; // 指定队列的名称 private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws JMSException { // 1.创建ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.创建Connection Connection connection = connectionFactory.createConnection(); // 3.启动连接 connection.start(); // 4.创建会话(第一个参数:是否在事务中处理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5. 创建一个目标 Destination destination = session.createQueue(QUEUE_NAME); // 6.创建一个生产者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 100; i++) { // 7.创建消息 TextMessage textMessage = session.createTextMessage("test" + i); // 8.发布消息 producer.send(textMessage); System.out.println("消息发送:" + textMessage.getText()); } // 9.关闭连接 connection.close(); } }

AppConsumer 类

package com.myimooc.jms.queue;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/** * App 消费者-队列模式 */public class AppConsumer { /** 指定ActiveMQ服务的地址 */ private static final String URL = "tcp://127.0.0.1:61616"; /** 指定队列的名称 */ private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws JMSException { // 1.创建ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.创建Connection Connection connection = connectionFactory.createConnection(); // 3.启动连接 connection.start(); // 4.创建会话(第一个参数:是否在事务中处理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.创建一个目标 Destination destination = session.createQueue(QUEUE_NAME); // 6.创建一个消费者 MessageConsumer consumer = session.createConsumer(destination); // 7.创建一个监听器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("接收消息:" + textMessage.getText()); } catch (JMSException e) { System.out.println("接收消息异常:"); e.printStackTrace(); } } }); // 8.关闭连接 //connection.close(); } }

JMS队列模式:

消息发布出去后,只要有消费者消费就算OK,不存在消费者要先订阅(启动监听)的问题。

4.3、主题模式

AppProducer 类

package com.myimooc.jms.topic;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/** * App 生产者-主题模式 */public class AppProducer { /** 指定ActiveMQ服务的地址 */ private static final String URL = "tcp://127.0.0.1:61616"; /** 指定主题的名称 */ private static final String TOPIC_NAME = "topic-test"; public static void main(String[] args) throws JMSException { // 1.创建ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.创建Connection Connection connection = connectionFactory.createConnection(); // 3.启动连接 connection.start(); // 4.创建会话(第一个参数:是否在事务中处理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5. 创建一个目标 Destination destination = session.createTopic(TOPIC_NAME); // 6.创建一个生产者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 100; i++) { // 7.创建消息 TextMessage textMessage = session.createTextMessage("test" + i); // 8.发布消息 producer.send(textMessage); System.out.println("消息发送:" + textMessage.getText()); } // 9.关闭连接 connection.close(); } }

AppConsumer 类

package com.myimooc.jms.topic;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/** * App 消费者-主题模式 */public class AppConsumer { /** 指定ActiveMQ服务的地址 */ private static final String URL = "tcp://127.0.0.1:61616"; /** 指定主题的名称 */ private static final String TOPIC_NAME = "topic-test"; public static void main(String[] args) throws JMSException { // 1.创建ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.创建Connection Connection connection = connectionFactory.createConnection(); // 3.启动连接 connection.start(); // 4.创建会话(第一个参数:是否在事务中处理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.创建一个目标 Destination destination = session.createTopic(TOPIC_NAME); // 6.创建一个消费者 MessageConsumer consumer = session.createConsumer(destination); // 7.创建一个监听器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("接收消息:" + textMessage.getText()); } catch (JMSException e) { System.out.println("接收消息异常:"); e.printStackTrace(); } } }); // 8.关闭连接 //connection.close(); } }

JMS主题模式:

“订阅者先订阅,发布者后发布消息  ---导致--> 订阅者才能收到消息"

就个人理解,先启动订阅者就是先于发布者监听目标队列,其次再由发布者向目标队列发送消息,这样订阅者才会收到信息。如果在订阅前先发布消息再订阅,那么之前的消息收不到,订阅之后的消息还能收到。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:springMVC如何防止表单重复提交详解
下一篇:POWER指令集架构正式开源
相关文章

 发表评论

暂时没有评论,来抢沙发吧~