springboot基于Redis发布订阅集群下WebSocket的解决方案

网友投稿 223 2023-02-06

springboot基于Redis发布订阅集群下WebSocket的解决方案

一、背景

单机节点下,WebSocket连接成功后,可以直接发送消息。而多节点下,连接时通过nginx会代理到不同节点。

假设一开始用户连接了node1的socket服务。触发消息发送的条件的时候也通过nginx进行代理,假如代理转到了node2节点上,那么node2节点的socket服务就发送不了消息,因为一开始用户注册的是node1节点。这就导致了消息发送失败。

为了解决这一方案,消息发送时,就需要一个中间件来记录,这样,三个节点都可以获取消息,然后在根据条件进行消息推送。

二、解决方案(springboot 基于 Redis发布订阅)

1、依赖

org.springframework.boot

spring-boot-starter-data-redis

org.springframework.boot

spring-boot-starter-websocket

2、创建业务处理类 Demo.class,该类可以实现MessageListener接口后重写onMessage方法,也可以不实现,自己写方法。

import com.alibaba.fastjson.JSON;

import com.dy.servicedgwzaDs.impl.OrdersServiceImpl;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.redis.connection.Message;

import org.springframework.data.redis.connection.MessageListener;

import org.springframework.stereotype.Component;

import java.util.HashMap;

/**

* @program:

* @description: redis消息订阅-业务处理

* @author: zhang yi

* @create: 2021-01-25 16:46

*/

@Component

public class Demo implements MessageListener {

Logger logger = LoggerFactory.getLogger(this.getClass());

@Override

public void onMessage(Message message, byte[] pattern) {

logger.info("消息订阅成功---------");

logger.info("内容:"+message.getBody());

logger.info("交换机:"+message.getChannel());

}

}

3、创建PubSubConfig配置类

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.cache.annotation.EnableCaching;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.data.redis.connection.RedisConnectionFactory;

import org.springframework.data.redis.core.StringRedisTemplate;

import org.springframework.data.redis.listener.PatternTopic;

import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/**

* @program:

* @description: redis发布订阅配置

* @author: zhang yi

* @create: 2021-01-25 16:49

*/

@Configuration

@EnableCaching

public class PubSubConfig {

Logger logger = LoggerFactory.getLogger(thishttp://.getClass());

//如果是多个交换机,则参数为(RedisConnectionFactory connectionFactory,

// MessageListenerAdapter listenerAdapter,

// MessageListenerAdapter listenerAdapter2)

@Bean

RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,

MessageListenerAdapter listenerAdapter) {

RedisMessageListenerContainer container = new RedisMessageListenerContainer();

container.setConnectionFactory(connectionFactory);

// 可以添加多个 messageListener,配置不同的交换机

container.addMessageListener(listenerAdapter, new PatternTopic("channel:demo"));

//container.addMessageListener(listenerAdapter2, new PatternTopic("channel:demo2"));

return container;

}

/**

* 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法

* @param demo 第一步的业务处理类

* @return

*/

@Bean

MessageListenerAdapter listenerAdapter(Demo demo) {

logger.info("----------------消息监听器加载成功----------------");

// onMessage 就是方法名,基于反射调用

return new MessageListenerAdapter(demo, "onMessage");

}

/**

* 多个交换机就多写一个

* @param subCheckOrder

* @return

*/

//@Bean

//MessageListenerAdapter listenerAdapter2(SubCheckOrder subCheckOrder) {

// logger.info("----------------消息监听器加载成功----------------");

// return new MessageListenerAdapter(subCheckOrder, "onMessage");

//}

@Bean

StringRedisTemplate template(RedisConnectionFactory connectionFactory) {

return new StringRedisTemplate(connectionFactory);

}

}

4、消息发布

@Autowired

private RedisTemplate redisTemplate;

redisTemplate.convertAndSend("channel:demo", "我是内容");

三、具体用法

socket连接成功。

socket消息推送时,把信息发布到redis中。socket服务订阅redis的消息,订阅成功后进行推送。集群下的socket都能订阅到消息,但是只有之前连接成功的节点能推送成功,其余的无法推送。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:spring boot+ redis 接口访问频率限制的实现
下一篇:Spring的@Validation和javax包下的@Valid区别以及自定义校验注解
相关文章

 发表评论

暂时没有评论,来抢沙发吧~