Springboot之整合Socket连接案例

网友投稿 365 2023-02-06

Springboot之整合Socket连接案例

Socket连接与硬件通信

一、如何让socket随着springboot项目一起启动

SpringBoot中CommandLineRunner的作用:平常开发中有可能需要实现在项目启动后执行的功能,SpringBoot提供的一种简单的实现方案就是添加一个model并实现CommandLineRunner接口,实现功能的代码放在实现的run方法中

具体实现

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.CommandLineRunner;

import org.springframework.stereotype.Component;

import java.net.ServerSocket;

import java.net.Socket;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

/**

* @author 易水●墨龙吟

* @Description

* @create 2019-04-14 23:40

*/

@Component

public class TestRunner implements CommandLineRunner {

@Autowired

private SocketProperties properties;

@Override

public void run(String... args) throws Exception {

ServerSocket server = null;

Socket socket = null;

server = new ServerSocket(properties.getPort());

System.out.println("设备服务器已经开启, 监听端口:" + properties.getPort());

ThreadPoolExecutor pool = new ThreadPoolExecutor(

properties.getPoolCore(),

properties.getPoolMax(),

properties.getPoolKeep(),

TimeUnit.SECONDS,

new ArrayBlockingQueue(properties.getPoolQueueInit()),

new ThreadPoolExecutor.DiscardOldestPolicy()

);

while (true) {

socket = server.accept();

pool.execute(new ServerConfig(socket));

}

}

}

此处使用了自定义的线程池,提高对于socket的客户端处理能力。

二、自定义配置并使用

此处将socket的端口和线程池的一些配置放到 application.yml中使用,方便使用和修改

# Socket配置

socket:

# 监听端口 2323

port: 2323

# 线程池 - 保持线程数 20

pool-keep: 20

# 线程池 - 核心线程数 10

pool-core: 10

# 线程池 - 最大线程数 20

pool-max: 30

# 线程队列容量 10

pool-queue-init: 10

import lombok.Getter;

import lombok.Setter;

import lombok.ToString;

import org.springframework.boot.context.properties.ConfigurationProperties;

import org.springframework.context.annotation.Configuration;

import org.springframework.context.annotation.PropertySource;

import org.springframework.stereotype.Component;

/**

* @author 易水●墨龙吟

* @Description

* @create 2019-04-18 22:35

*/

@Setter

@Getter

@ToString

@Component

@Configuration

@PropertySource("classpath:application.yml")

@ConfigurationProperties(prefix = "socket")

public class SocketProperties {

private Integer port;

private Integer poolKeep;

private Integer poolCore;

private Integer poolMax;

private Integer poolQueueInit;

}

三、Socket对于客户端发来的信息的处理和重发机制

当客户端端连接之后发送信息,如果超时未发送,将会关闭,发送数据有异常将会返回给客户端一个error,让客户端在发送一次数据。

import com.farm.config.socket.resolve.MessageChain;

import com.farm.service.EnvironmentService;

import com.farm.service.impl.EnvironmentServiceImpl;

import java.io.*;

import java.net.Socket;

import java.net.SocketException;

import java.net.SocketTimeoutException;

import java.util.Map;

/**

* @author 易水●墨龙吟

* @Description

* @create 2019-04-14 23:21

*/

public class ServerConfig extends Thread {

private Socket socket;

public ServerConfig(Socket socket) {

this.socket = socket;

}

// 获取spring容器管理的类,可以获取到sevrice的类

private EnvironmentService service = SpringUtil.getBean(EnvironmentServiceImpl.class);

private String handle(InputStream inputStream) throws IOException, DataFormException {

byte[] bytes = new byte[1024];

int len = inputStream.read(bytes);

if (len != -1) {

StringBuffer request = new StringBuffer();

request.append(new String(bytes, 0, len, "UTF-8"));

System.out.println("接受的数据: " + request);

System.out.println("from client ... " + request + "当前线程" + Thread.currentThread().getName());

Map map = MessageChain.out(request.toString());

System.out.println("处理的数据" + map);

Integer res = service.addEnvironment(map);

if (res == 1) {

return "ok";

} else {

throw new DataFormException("数据处理异常");

}

} else {

throw new DataFormException("数据处理异常");

}

}

@Override

public void run() {

BufferedWriter writer = null;

try {

// 设置连接超时9秒

socket.setSoTimeout(9000);

System.out.println("客户 - " + socket.getRemoteSocketAddress() + " -> 机连接成功");

InputStream inputStream = socket.getInputStream();

writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));

String result = null;

try {

result = handle(inputStream);

writer.write(result);

writer.newLine();

writer.flush();

} catch (IOException | DataFormException | IllegalArgumentException e) {

writer.write("error");

writer.newLine();

writer.flush();

System.out.println("发生异常");

try {

System.out.println("再次接受!");

result = handle(inputStream);

writer.write(result);

writer.newLine();

writer.flush();

} catch (DataFormException | SocketTimeoutException ex) {

System.out.println("再次接受, 发生异常,连接关闭");

}

}

} catch (SocketException socketException) {

socketException.printStackTrace();

try {

writer.close();

} catch (IOException ioException) {

ioException.printStackTrace();

}

} catch (IOException e) {

e.printStackTrace();

} finally {

try {

writer.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

在此处有一个坑,如果客户端是用C/C++编写的,必须使用如下方法:

byte[] bytes = new byte[1024];

int len = inputStream.read(bytes);

如果使用readLine或者 DataInputStream dataInputStream =new DataInputStream(socket.getInputStream())这样会出现使用TCP连接助手,客户端发送数据收不到。

四、如何在普通类中使用Spring注入类

这里需要使用一个工具类。

import org.springframework.beans.BeansException;

import org.springframework.context.ApplicationContext;

import org.springframework.context.ApplicationContextAware;

import org.springframework.stereotype.Component;

/**

* @author 易水●墨龙吟

* @Description

* @create 2019-04-15 0:01

*/

@Component

public class SpringUtil implements ApplicationContextAware {

private static ApplicationContext applicationContext;

@Override

public void setApplicationContext(ApplicationContext applicatiohttp://nContext) throws BeansException {

if (SpringUtil.applicationContext == null) {

SpringUtil.applicationContext = applicationContext;

}

}

/**

* 获取applicationContext

* @return

*/

public static ApplicationContext getApplicationContext() {

return applicationContext;

}

/**

* 通过name获取 Bean.

* @param name

* @return

*/

public static Object getBean(String name){

return getApplicationContext().getBean(name);

}

/**

* 通过class获取Bean.

* @param clazz

* @param

* @return

*/

public static T getBean(Class clazz){

return getApplicationContext().getBean(clazz);

}

/**

* 通过name,以及Clazz返回指定的Bean

* @param name

* @param clazz

* @param

* @return

*/

public static T getBean(String name,Class clazz){

return getApplicationContext().getBean(name, clazz);

}

}

补充:springboot下websocket前台后端数据长连接

首先导入依赖

org.springframework.boot

spring-boot-starter-websocket

org.springframework.security

spring-security-messaging

spring-security-messaging 是后面继承 AbstractSecurityWebSocketMessageBrokerConfigurer需要用到的依赖

WebSocketConfig

@Configuration

@EnableWebSocketMessageBroker //此注解表示使用STOMP协议来传输基于消息代理的消息,此时可以在@Controller类中使用@MessageMapping

public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Override

public void registerStompEndpoints(StompEndpointRegistry registry) {

/**

* 注册 Stomp的端点

* addEndpoint:添加STOMP协议的端点。这个HTTP URL是供WebSocket或Sockjs客户端访问的地址

* withSockJS:指定端点使用SockJS协议

*/

registry.addEndpoint("/websocket/tracker") //物流消息通道,

.setAllowedOrigins("*") //允许跨域,里面路径可以设定

.withSockJS() //指定协议

.setInterceptors(httpSessionHandshakeInterceptor()) ; //设置拦截器()

}

@Override

public void configureMessageBroker(MessageBrokerRegistry registry) {

/**

* 配置消息代理

* 启动简单Broker,消息的发送的地址符合配置的前缀来的消息才发送到这个broker

*/

registry.enableSimpleBroker("/topic","/user");

}

//拦截器

@Bean

public HandshakeInterceptor httpSessionHandshakeInterceptor() {

return new HandshakeInterceptor() {

@Override

public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) throws Exception {

//可以在这里先判断登录是否合法

return true;

}

@Override

public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {

//握手成功后,

}

};

}

}

WebsocketSecurityConfiguration

@Configuration

public class WebsocketSecurityConfiguration extends AbstractSecurityWebSocketMessageBrokerConfigurer {

@Override

protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {

messages

.nullDestMatcher().authenticated()

.simpDestMatchers("/topic/**").authenticated()

.simpDestMatchers("/user/**").authenticated()

.simpTypeMatchers(SimpMessageType.MESSAGE, SimpMessageType.SUBSCRIBE).denyAll()

// catch all

.anyMessage().denyAll();

}

/**

* Disables CSRF for Websockets.

*/

@Override

protected boolean sameOriginDisabled() {

return true;

}

}

WebSocketResource

package com.gleam.shopmall.web.rest;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.ApplicationListener;

import org.springframework.messaging.handler.annotation.MessageMapping;

import org.springframework.messaging.handler.annotation.SendTo;

import org.springframework.messaging.simp.SimpMessageHeaderAccessor;

import org.springframework.messaging.simp.SimpMessageMappingInfo;

import org.springframework.messaging.simp.SimpMessageSendingOperations;

import org.springframework.stereotype.Controller;

import org.springframework.web.socket.messaging.SessionDisconnectEvent;

@Controller

public class WebSocketResource {

private static final Logger log = LoggerFactory.getLogger(WebSocketResource.class);

@Autowired

SimpMessageSendingOperations messagingTemplate;

//此方法适用于网页聊天室,从前端接收数据,返回订阅者(前端)

@MessageMapping("/welcome") //指定要接收消息的地址,类似@RequestMapping

@SendTo("/topic/getResponse") //默认消息将被发送到与传入消息相同的目的地,但是目的地前面附加前缀(默认情况下为“/topic”}

public String say(String message) throws Exception {

return message;

}

//发送指定用户(直接从后端发送数据到前端)

public void sendToUser(String login,String channel, String info) {

log.debug("[ToUser]WEBSOCKET发送消息, username={}, info={}", login, info);

this.messagingTemplate.convertAndSendToUser(login, channel, info);

log.debug("[ToUser]WEBSOCKET发送消息:完成");

}

//发送所有订阅的(直接从后端发送数据到前端)

public void send(String channel, String info) {

log.debug("[ToAll]WEBSOCKET发送消息, info={}", info);

// this.messagingTemplate.convertAndSend(channel, info);

this.messagingTemplate.convertAndSend("/topic/getResponse", "接收到了吗?");

log.debug("[ToAll]WEBSOCKET发送消息:完成");

}

}

前端html

貌似你的浏览器不支持websocket

```

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:springboot 之jpa高级查询操作
下一篇:SpringBoot2 JPA解决懒加载异常的问题
相关文章

 发表评论

暂时没有评论,来抢沙发吧~