RabbitMQ知识点整理1-生产和消费消息

网友投稿 259 2022-09-20

RabbitMQ知识点整理1-生产和消费消息

演示如何使用RabbitMQ Java 客户端生产和消费消息

目前最新的RabbitMQ Java 客户端版本为5.9.0, 相应的maven 构建文件如下:

com.rabbitmq amqp-client x.x.x

版本号根据项目的实际情况进行选择.

通过 ​​RabbitMQ 的 Web 管理界面,默认用户名密码都是 guest。

默认情况下,访问RabbitMQ 服务的用户名和密码都是"guest " , 这个账户有限制,默认只能通过本地网络(如localhost) 访问,远程网络访问受限,所以在实现生产和消费消息之前,需要另外添加一个用户,并设置相应的访问权限。添加新的用户可以登录到管理界面之后进行手动添加, 没有难度

计算机的世界是从" Hello World!" 开始的,这里我们也沿用惯例, 首先生产者发送一条消息、"Hello World ! "至RabbitMQ 中, 之后由消费者消费。下面先演示生产者客户端的代码,接着再演示消费者客户端的代码。

生产者客户端代码:

package demo.java.web.amqp.rabbitmq.demo1;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;/** * 生产者客户端代码 * * @author jiangkd * @date 2020/10/11 */public class RabbitProducer { final private static String EXCHANGE_NAME = "exchange_demo"; final private static String ROUTING_KEY = "routingkey_demo"; final private static String QUEUE_NAME = "queue_demo"; final private static String IP_ADDRESS = "172.16.176.40"; /** * rabbitmq服务端默认端口号为5672 */ final private static int PORT = 5672; public static void main(String[] args) throws IOException, TimeoutException { // 和服务器建立连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername("root"); factory.setPassword("root123"); // 创建连接 Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel(); // 创建一个type为direct、持久化的、非自动删除的交换机 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null); // 创建一个持久化的、非排它的、非自动删除的队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 将交换机与队列通过路由绑定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // 发送一条持久化的消息 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); // 关闭资源 channel.close(); connection.close(); }}

View Code

上面的生产者客户端的代码首先和RabbitMQ 服务器建立一个连接Connection, 然后在这个连接之上创建一个信道Channel。之后创建一个交换器Exchange 和一个队列Queue ,并通过路由键进行绑定,然后发送一条消息, 最后关闭资源。

消费者客户端代码:

package demo.java.web.amqp.rabbitmq.demo1;import java.io.IOException;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Address;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;/** * 消费者客户端代码 * * @author jiangkd * @date 2020/10/11 */public class RabbitComsumer { final private static String QUEUE_NAME = "queue_demo"; final private static String IP_ADDRESS = "172.16.176.40"; /** * rabbitmq服务端默认端口号为5672 */ final private static int PORT = 5672; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { // ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUsername("root"); connectionFactory.setPassword("root123"); Address[] addresses = {new Address(IP_ADDRESS, PORT)}; // 注意, 这里获取连接的方式和生产者客户端不同 Connection connection = connectionFactory.newConnection(addresses); // 创建信道 Channel channel = connection.createChannel(); channel.basicQos(64); // 设置消费者客户端最多接收未被ack的消息的个数 // 定义consumer Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到的消息: " + new String(body)); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, consumer); // 等待回调函数执行完毕之后, 关闭资源 TimeUnit.SECONDS.sleep(5); channel.close(); connection.close(); }}

View Code

注意这里采用的是继承DefaultConsumer 的方式来实现消费....不推荐使用QueueingConsumer

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:vue-cli3.x+electron安装教程
下一篇:MongoDB教程8-MongoDB创建和查看数据库
相关文章

 发表评论

暂时没有评论,来抢沙发吧~