在SpringBoot中,如何使用Netty实现远程调用方法总结

网友投稿 259 2023-01-06

在SpringBoot中,如何使用Netty实现远程调用方法总结

Netty

Netty是一个NIO客户端服务器框架:

它可快速轻松地开发网络应用程序,例如协议服务器和客户端。

它极大地简化和简化了网络编程,例如TCP和UDP套接字服务器。

NIO是一种非阻塞IO ,它具有以下的特点

单线程可以连接多个客户端。

选择器可以实现单线程管理多个Channel,新建的通道都要向选择器注册。

一个SelectionKey键表示了一个特定的通道对象和一个特定的选择器对象之间的注册关系。

selector进行select()操作可能会产生阻塞,但是可以设置阻塞时间,并且可以用wakeup()唤醒selector,所以NIO是非阻塞IO。

Netty模型selector模式

它相对普通NIO的在性能上有了提升,采用了:

NIO采用多线程的方式可以同时使用多个selector

通过绑定多个端口的方式,使得一个selector可以同时注册多个ServerSocketServer

单个线程下只能有一个selector,用来实现Channel的匹配及复用

半包问题

TCP/IP在发送消息的时候,可能会拆包,这就导致接收端无法知道什么时候收到的数据是一个完整的数据。在传统的BIO中在读取不到数据时会发生阻塞,但是NIO不会。为了解决NIO的半包问题,Netty在Selector模型的基础上,提出了reactor模式,从而解决客户端请求在服务端不完整的问题。

netty模型reactor模式

在selector的基础上解决了半包问题。

上图,简单地可以描述为"boss接活,让work干":manReactor用来接收请求(会与客户端进行握手验证),而subReactor用来处理请求(不与客户端直接连接)。

SpringBoot使用Netty实现远程调用

maven依赖

org.projectlombok

lombok

1.18.2

true

io.netty

netty-all

4.1.17.Final

服务端部分

NettyServer.java:服务启动监听器

@Slf4j

public class NettyServer {

public void start() {

InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 8082);

//new 一个主线程组

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

//new 一个工作线程组

EventLoopGroup workGroup = new NioEventLoopGroup(200);

ServerBootstrap bootstrap = new ServerBootstrap()

.group(bossGroup, workGroup)

.channel(NioServerSocketChannel.class)

.childHandler(new ServerChannelInitializer())

.localAddress(socketAddress)

//设置队列大小

.option(ChannelOption.SO_BACKLOG, 1024)

// 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文

.childOption(ChannelOption.SO_KEEPALIVE, true);

//绑定端口,开始接收进来的连接

try {

ChannelFuture future = bootstrap.bind(socketAddress).sync();

log.info("服务器启动开始监听端口: {}", socketAddress.getPort());

future.channel().closeFuture().sync();

} catch (InterruptedException e) {

log.error("服务器开启失败", e);

} finally {

//关闭主线程组

bossGroup.shutdownGracefully();

//关闭工作线程组

workGroup.shutdownGracefully();

}

}

}

ServerChannelInitializer.java:netty服务初始化器

/**

* netty服务初始化器

**/

public class ServerChannelInitializer extends ChannelInitializer {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

//添加编解码

socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));

socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));

socketChannel.pipeline().addLast(new NettyServerHandler());

}

}

NettyServerHandler.java:netty服务端处理器

/**

* netty服务端处理器

**/

@Slf4j

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

/**

* 客户端连接会触发

*/

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

log.info("Channel active......");

}

/**

* 客户端发消息会触发

*/

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

log.info("服务器收到消息: {}", msg.toString());

ctx.write("你也好哦");

ctx.flush();

}

/**

* 发生异常触发

*/

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

cause.printStackTrace();

ctx.close();

}

}

RpcServerApp.java:SpringBoot启动类

/**

* 启动类

*

*/

@Slf4j

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})

public class RpcServerApp extends SpringBootServletInitializer {

@Override

protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {

return application.sources(RpcServerApp.class);

}

/**

* 项目的启动方法

*

* @param args

*/

public static void main(String[] args) {

SpringApplication.run(RpcServerApp.class, args);

//开启Netty服务

NettyServer nettyServer =new NettyServer ();

nettyServer.start();

log.info("======服务已经启动========");

}

}

客户端部分

NettyClientUtil.java:NettyClient工具类

/**

* Netty客户端

**/

@Slf4j

public class NettyClientUtil {

public static ResponseResult helloNetty(String msg) {

NettyClientHandler nettyClientHandler = new NettyClientHandler();

EventLoopGroup group = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap()

.group(group)

//该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输

.option(ChannelOption.TCP_NODELAY, true)

.channel(NioSocketChannel.class)

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

socketChannel.pipeline().addLast("decoder", new StringDecoder());

http:// socketChannel.pipeline().addLast("encoder", new StringEncoder());

socketChannel.pipeline().addLast(nettyClientHandler);

}

});

try {

ChannelFuture future = bootstrap.connect("127.0.0.1", 8082).sync();

log.info("客户端发送成功....");

//发送消息

future.channel().writeAndFlush(msg);

// 等待连接被关闭

future.channel().closeFuture().sync();

return nettyClientHandler.getResponseResult();

} catch (Exception e) {

log.error("客户端Netty失败", e);

throw new BusinessException(CouponTypeEnum.OPERATE_ERROR);

} finally {

//以一种优雅的方式进行线程退出

group.shutdownGracefully();

}

}

}

NettyClientHandler.java:客户端处理器

/**

* 客户端处理器

**/

@Slf4j

@Setter

@Getter

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

private ResponseResult responseResult;

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

log.info("客户端Active .....");

}

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

log.info("客户端收到消息: {}", msg.toString());

this.responseResult = ResponseResult.success(msg.toString(), CouponTypeEnum.OPERATE_SUCCESS.getCouponTypeDehttp://sc());

ctx.close();

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

cause.printStackTrace();

ctx.close();

}

}

验证

测试接口

@RestController

@Slf4j

public class UserController {

@PostMapping("/helloNetty")

@MethodLogPrint

public ResponseResult helloNetty(@RequestParam String msg) {

return NettyClientUtil.helloNetty(msg);

}

}

访问测试接口

服务端打印信息

客户端打印信息

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:豆瓣网站开放的api接口(豆瓣api v2 官网)
下一篇:浅谈Java开发架构之领域驱动设计DDD落地
相关文章

 发表评论

暂时没有评论,来抢沙发吧~