干货|SpringBoot JMS(ActiveMQ)API实践应用详解

网友投稿 245 2023-05-21

[[350573]]

 前言

Active是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。AC-tiveMQ使用Apache提供的授权,任何人都可以对其实现代码进行修改。

ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。ActiveMQ实现了JMS标准并提供了很多附加的特性。本文将带大家详细介绍ActiveMQ的API的使用。

1. JMS的概念?

「什么是JMS呢:」

JMS---------JAVA Message Service JAVA的消息服务,是sun公司提供的接口,只是一个规范,这个规范就类似于JDBC是一样的,使用的时候是需要当前规范的实现产品的。

「JMS能干什么呢:」

能够将信息发布到目的地 可以从目的地来消费这个消息

2、两种通信模型

「队列的通信概念:」

特点:当我们同一个队列有多个消费者的时候,多个消费者的数据之和才是原来队列中的所有数据 队列的通信模型最大的适用场景:流量的消峰,高并发的处理

「主题的通信模型:」

特点:当我们队列有多个消费者的时候,那么这多个消费者消费到的数据是一样的 主题消费者通信模型的适用场景:微服务下服务之间的异步通信

3. MQ的实现产品

「实现产品:」

ActiveMQ RabbitMQ RockerMQ Kafka(这个设计的初衷是做分布式的日志的,后来因为日志有严格的顺序问题,这个时候人们就用Kafka来做消息队列了)

4、JMS中常见的名词

「常见的名词:」

ActiveMQConnectionFactory:这个是创建连接的工厂 ConnectionFactory:连接的工厂 Connection:连接JAVA对MQ的一个连接 Destination:目的地 生产者(Producer) 消费者(Consumer) Session:会话(每一次对MQ的操作都称为一次会话) Queue:队列 Topic:主题

5、什么是消息队列

「消息队列简单的说就是用来存放临时数据的地方:」

生产者----------->存储介质上 消费者----------->存储介质上

「消息队列类似于快递公司:」

你可以将东西交给快递公司 目标人也可以从快递公司去取东西

6. ActiveMQ是什么

「含义:」

ActiveMQ就是一个JMS的实现产品,它能够实现JMS下的所有功能

7、ActiveMQ能干什么

「主要作用:」

流量消峰处理 微服务下模块的异步通信 处理高并发下的订单 处理第三方平台的高并发 协助消息表可以完成分布式事务的最终一致性

8、ActiveMQ的安装

「ActiveMQ的安装和配置:」

复制 1、官网下载Linux版的ActiveMQ(最新版本为5.13.4)         https://activemq.apache.org/download.html         2、解压安装        tar -zxvf apache-activemq-5.13.4-bin.tar.gz         3、配置(这里采用默认配置,无需修改)        vim /usr/lical/activemq-1/conf/activemq.xml         4、启动        cd /usr/local/activemq-1/bin  ./activemq start        5、打开管理界面(管理界面可以查看并管理所有队列及消息)           http://192.168.1.100:8161         启动成功后,可以浏览 http://localhost:8161/admin/         默认用户名、密码:admin/admin         管理界面是用jetty做容器的,如果想修改管理界面的端口,可以编辑../conf/jetty.xml,找到下面这一段:        <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"     <!-- the default port       number for the web console -->     <property name="host" value="0.0.0.0"/>      <property name="port" value="8161"/>       </bean>         用户名/密码是在 ../conf/jetty-realm.properties 里,比如要增加一个管理员jimmy/123456,可参考下面修改:         1  3admin: admin, admin  jimmy: 123456, admin  useruseruser        注:管理界面有一个小坑,ActiveMQ 5.13.2与jdk1.8兼容性有点问题,如果使用jdk1.8,管理界面进入Queues标签页时,偶尔会报错,但是并不影响消息正常收发,只是无法从界面上查看队列情况,如果出现该问题,可将jdk版本降至1.7,同时最好清空data目录下的所有数据,再重启activemq即可。  1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.

9. ActiveMQ的API的使用

「AcatveMQ的API使用:」

队列的使用(生产者)
复制package com.qy.mq.queue;  import org.apache.activemq.ActiveMQConnectionFactory;  import org.apache.activemq.Message;  import javax.jms.*;  /**   * @Auther: qianyu   * @Date: 2020/11/04 14:12   * @Description:生产者   */  public class Producer {      //准备发布的这个地址      private  static final String PATH="tcp://10.7.182.87:61616"     //ActiveMQ下的用户名      private static final String userName="admin"     //ActiveMQ下的密码      private static final String password="admin"     publicstatic void main(String[] args) throws JMSException {          //第一步:创建连接的工厂          ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);          //通过这个工厂获取连接          Connectionconnection = activeMQConnectionFactory.createConnection();          //第三步:打开这个连接          connection.start();          //第四步:创建操作MQ的这个会话          /**           * 第一个参数:是否使用事务           * 第二个参数:客户端的应答模式           *     第一种:自动应答           *     第二种:客户端手动应答           */          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);          //需要发送消息的目的地(queue操作的是队列)          Destination destination=session.createQueue("wqqq");          //生产者来生产这个消息          //要有生产者          MessageProducer messageProducer = session.createProducer(destination);          //发送很多的消息到消息队列中去  //        for (int i=0;i<100;i++){              //需要准备发送的消息  //            TextMessage textMessage = session.createTextMessage("我是浅羽:"+i);              //研究消息的类型            /*  BytesMessage bytesMessage = session.createBytesMessage();              bytesMessage.setByteProperty("www",new Byte("123"));              //接下来就可以发送消息了              messageProducer.send(bytesMessage);*/          //创建map类型的message          /*MapMessage mapMessage = session.createMapMessage();          mapMessage.setInt("www1",123);          messageProducer.send(mapMessage);*/          ObjectMessage objectMessage = session.createObjectMessage(new User(1, "qianyu""123"));          messageProducer.send(objectMessage);  //        }      }  1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.
队列的使用(消费者)
复制package com.qy.mq.queue;  import org.apache.activemq.ActiveMQConnectionFactory;  import javax.jms.*;  import java.io.Serializable /**   * @Auther: qianyu   * @Date: 2020/11/04 14:13   * @Description:消费者   */  public class Consumer {      //准备发布的这个地址      private  static final String PATH="tcp://10.7.182.87:61616"     //ActiveMQ下的用户名      private static final String userName="admin"     //ActiveMQ下的密码      private static final String password="admin"     publicstatic void main(String[] args) throws JMSException {          //第一步:创建连接的工厂          ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);          //通过这个工厂获取连接          Connectionconnection = activeMQConnectionFactory.createConnection();          //第三步:打开这个连接          connection.start();          //第四步:创建操作MQ的这个会话          /**           * 第一个参数:是否使用事务           * 第二个参数:客户端的应答模式           *     第一种:自动应答           *     第二种:客户端手动应答           */          Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);          //需要发送消息的目的地(queue操作的是队列)          Destination destination=session.createQueue("wqqq");          //创建我们的消费者了          MessageConsumer messageConsumer = session.createConsumer(destination);          //接下来就可以接收我们的消息了          //Message message = messageConsumer.receive();          //接收消息并指定这个超时的时间  //      Message message = messageConsumer.receive(5000);          //接收消息没有就不等待了 直接over了  不接收了  //      Message message = messageConsumer.receiveNoWait();          //给定当前的路径设置监听器          messageConsumer.setMessageListener(new MessageListener() {              @Override              public void onMessage(Message message) {                 /* BytesMessage bytesMessage= (BytesMessage) message;                  try {                      System.out.println("获取到的数据是:"+bytesMessage.getByteProperty("www"));                  } catch (JMSException e) {                      e.printStackTrace();                  }*/                 /*MapMessage mapMessage= (MapMessage) message;                  try {                      System.out.println("获取到的数据是:"+mapMessage.getInt("www1"));                  } catch (JMSException e) {                      e.printStackTrace();                  }*/                 //测试对象类型的消息的发送和接收                 ObjectMessage objectMessage= (ObjectMessage) message;                  try {                      Useruser = (User) objectMessage.getObject();                      System.out.println("接收到的数据是:"+user);                  } catch (JMSException e) {                      e.printStackTrace();                  }                /*  //我们知道是一个字符串类型的消息                  TextMessage textMessage= (TextMessage) message;                  //接下来就可以打印这个消息了                  try {                      System.out.println("消费者1---接收到的消息是:"+textMessage.getText());                  } catch (JMSException e) {                      e.printStackTrace();                  }*/                  try {                      //这句话就表示的是客户端来手动的进行应答                      message.acknowledge();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }          });      }  1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.97.98.99.100.101.102.
主题模型的生产者
复制package com.qy.mq.topic;  import org.apache.activemq.ActiveMQConnectionFactory;  import javax.jms.*;  /**   * @Auther: qianyu   * @Date: 2020/11/04 15:17   * @Description:   */  public class Producer {      //准备发布的这个地址      private  static final String PATH="tcp://10.7.182.87:61616"     //ActiveMQ下的用户名      private static final String userName="admin"     //ActiveMQ下的密码      private static final String password="admin"     publicstatic void main(String[] args) throws JMSException {          //第一步:创建连接的工厂          ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);          //通过这个工厂获取连接          Connectionconnection = activeMQConnectionFactory.createConnection();          //第三步:打开这个连接          connection.start();          //第四步:创建操作MQ的这个会话          /**           * 第一个参数:是否使用事务           * 第二个参数:客户端的应答模式           *     第一种:自动应答           *     第二种:客户端手动应答           */          Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);          //需要发送消息的目的地(下面创建的就应该是主题模型的地址)          Destination destination=session.createTopic("topic222");          //生产者来生产这个消息          //要有生产者          MessageProducer messageProducer = session.createProducer(destination);          //发送很多的消息到消息队列中去          for (int i=0;i<100;i++){              //需要准备发送的消息              TextMessage textMessage = session.createTextMessage("我是浅羽:"+i);              //接下来就可以发送消息了              messageProducer.send(textMessage);          }      }  1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.
主题模型的消费者
复制package com.qy.mq.topic;  import org.apache.activemq.ActiveMQConnectionFactory;  import javax.jms.*;  /**   * @Auther: qianyu   * @Date: 2020/11/04 15:19   * @Description:   */  public class Consumer {      //准备发布的这个地址      private  static final String PATH="tcp://10.7.182.87:61616"     //ActiveMQ下的用户名      private static final String userName="admin"     //ActiveMQ下的密码      private static final String password="admin"     publicstatic void main(String[] args) throws JMSException {          //第一步:创建连接的工厂          ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);          //通过这个工厂获取连接          Connectionconnection = activeMQConnectionFactory.createConnection();          //第三步:打开这个连接          connection.start();          //第四步:创建操作MQ的这个会话          /**           * 第一个参数:是否使用事务           * 第二个参数:客户端的应答模式           *     第一种:自动应答           *     第二种:客户端手动应答           */          Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);          //需要发送消息的目的地(queue操作的是队列)          Destination destination=session.createTopic("topic222");          //创建我们的消费者了          MessageConsumer messageConsumer = session.createConsumer(destination);          //接下来就可以接收我们的消息了          //给定当前的路径设置监听器          messageConsumer.setMessageListener(new MessageListener() {              @Override              public void onMessage(Message message) {                  //我们知道是一个字符串类型的消息                  TextMessage textMessage= (TextMessage) message;                  //接下来就可以打印这个消息了                  try {                      System.out.println("消费者1---接收到的消息是:"+textMessage.getText());                  } catch (JMSException e) {                      e.printStackTrace();                  }                  try {                      message.acknowledge();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }          });      }  1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.

本篇关于ActiveMQ的介绍就先到这里结束了,后续会出更多关于ActiveMQ系列更多文章,谢谢大家支持!

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:APISpace的 人脸检测API 它来啦~
下一篇:手机号能改归属地吗?手机区号归属地查询带你了解!
相关文章

 发表评论

暂时没有评论,来抢沙发吧~