JAVA Netty实现聊天室+私聊功能的示例代码

网友投稿 249 2023-03-25

JAVA Netty实现聊天室+私聊功能的示例代码

功能介绍

使用Netty框架实现聊天室功能,服务器可监控客户端上下限状态,消息转发。同时实现了点对点私聊功能。技术点我都在代码中做了备注,这里不再重复写了。希望能给想学习netty的同学一点参考。

服务器代码

服务器入口代码

package nio.test.netty.groupChat;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.string.StringDecoder;

import io.netty.handler.codec.string.StringEncoder;

import io.netty.util.concurrent.Future;

import io.netty.util.concurrent.GenericFutureListener;

/**

* netty群聊 服务器端

* @author zhang

*

*/

public class NettyChatServer {

private int port;

public NettyChatServer(int port){

this.port = port;

}

//初始化 netty服务器

private void init() throws Exception{

EventLoopGroup boss = new NioEventLoopGroup(1);

EventLoopGroup work = new NioEventLoopGroup(16);

try {

ServerBootstrap boot = new ServerBootstrap();

boot.group(boss,work);

boot.channel(NioServerSocketChannel.class);//设置boss selector建立channel使用的对象

boot.option(ChannelOption.SO_BACKLOG, 128);//boss 等待连接的 队列长度

boot.childOption(ChannelOption.SO_KEEPALIVE, true); //让客户端保持长期活动状态

boot.childHandler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

//从channel中获取pipeline 并往里边添加Handler

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("encoder",new StringEncoder());

pipeline.addLast("decoder",new StringDecoder());

pipeline.addLast(new ServerMessageHandler());//自定义Handler来处理消息

}

});

System.out.println("服务器开始启动...");

//绑定端口

ChannelFuture channelFuture = boot.bind(port).sync();

channelFuture.addListener(new GenericFutureListener>() {

@Override

public void operationComplete(Future super Void> future)

throws Exception {

if(future.isSuccess()){

System.out.println("服务器正在启动...");

}

if(future.isDone()){

System.out.println("服务器启动成功...OK");

}

}

});

//监听channel关闭

channelFuture.channel().closeFuture().sync();

channelFuture.addListener(new GenericFutureListener>() {

@Override

public void operationComplete(Future super Void> future)

throws Exception {

if(future.isCancelled()){

System.out.println("服务器正在关闭..");

}

if(future.isCancellable()){

System.out.println("服务器已经关闭..OK");

}

}

});

}finally{

boss.shutdownGracefully();

work.shutdownGracefully();

}

}

/**

* 启动服务器 main 函数

* @param args

* @throws Exception

*/

public static void main(String[] args) throws Exception {

new NettyChatServer(9090).init();

}

}

服务器端消息处理Handler

package nio.test.netty.groupChat;

import io.netty.channel.Channel;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.channel.group.ChannelGroup;

import io.netty.channel.group.DefaultChannelGroup;

import io.netty.util.concurrent.GlobalEventExecutor;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.HashMap;

import java.util.Map;

/**

* 自定义 服务器端消息处理Handler

* @author zhang

*

*/

public class ServerMessageHandler extends SimpleChannelInboundHandler{

/**

* 管理全局的channel

* GlobalEventExecutor.INSTANCE 全局事件监听器

* 一旦将channel 加入 ChannelGroup 就不要用手动去

* 管理channel的连接失效后移除操作,他会自己移除

*/

private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

/**

* 为了实现私聊功能,这里key存储用户的唯一标识,

* 我保存 客户端的端口号

* 当然 这个集合也需要自己去维护 用户的上下线 不能像 ChannelGroup那样自己去维护

*/

private static Map all = new HashMap();

private SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

/**

* 处理收到的消息

*/

@Override

protected void channelRead0(ChannelHandlerContext ctx, String msg)

throws Exception {

Channel channel = ctx.channel();

/**

* 这里简单判断 如果内容里边包含#那么就是私聊

*/

if(msg.contains("#")){

String id = msg.split("#")[0];

String body = msg.split("#")[1];

Channel userChannel = all.get(id);

String key = channel.remoteAddress().toString().split(":")[1];

userChannel.writeAndFlush(sf.format(new Date())+"\n 【用户】 "+key+" 说 : "+body);

return;

}

//判断当前消息是不是自己发送的

for(Channel c : channels){

String addr = c.remoteAddress().toString();

if(channel !=c){

c.writeAndFlush(sf.format(new Date())+"\n 【用户】 "+addr+" 说 : "+msg);

}else{

c.writeAndFlush(sf.format(new Date())+"\n 【自己】 "+addr+" 说 : "+msg);

}

}

}

/**

* 建立连接以后第一个调用的方法

*/

@Override

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

Channel channel = ctx.channel();

String addr = channel.remoteAddress().toString();

/**

* 这里 ChannelGroup 底层封装会遍历给所有的channel发送消息

MffahOpg*

*/

channels.writeAndFlush(sf.format(new Date())+"\n 【用户】 "+addr+" 加入聊天室 ");

channels.add(channel);

String key = channel.remoteAddress().toString().split(":")[1];

all.put(key, channel);

}

/**

* channel连接状态就绪以后调用

*/

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

String addr = ctx.channel().remoteAddress().toString();

System.out.println(sf.format(new Date())+" \n【用户】 "+addr+" 上线 ");

}

/**

* channel连接状态断开后触发

*/

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

String addr = ctx.channel().remoteAddress().toString();

System.out.println(sf.format(new Date())+" \n【用户】 "+addr+" 下线 ");

//下线移除

String key = ctx.channel().remoteAddress().toString().split(":")[1];

all.remove(key);

}

/**

* 连接发生异常时触发

*/

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

throws Exception {

//System.out.println("连接发生异常!");

ctx.close();

}

/**

* 断开连接会触发该消息

* 同时当前channel 也会自动从ChannelGroup中被移除

*/

@Override

public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

Channel channel = ctx.channel();

String addr = channel.remoteAddress().toString();

/**

* 这里 ChannelGroup 底层封装会遍历给所有的channel发送消息

*

*/

channels.writeAndFlush(sf.format(new Date())+"\n 【用户】 "+addr+" 离开了 ");

//打印 ChannelGroup中的人数

System.out.println("当前在线人数是:"+channels.size());

System.out.println("all:"+all.size());

}

}

客户端主方法代码

package nio.test.netty.groupChat;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.Channel;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioSocketChannel;

import io.netty.handler.codec.string.StringDecoder;

import io.netty.handler.codec.string.StringEncoder;

import io.netty.util.concurrent.Future;

import io.netty.util.concurrent.GenericFutureListener;

import java.util.Scanner;

public class NettyChatClient {

private String ip;

private int port;

public NettyChatClient(String ip,int port){

this.ip = ip;

this.port = port;

}

/**

* 初始化客户

*/

private void init() throws Exception{

//创建监听事件的监听器

EventLoopGroup work = new NioEventLoopGroup();

try {

Bootstrap boot = new Bootstrap();

boot.group(work);

boot.channel(NioSocketChannel.class);

boot.handler(new ChannelInitializer() {

@Override

protected void initChannel(NioSocketChannel ch)

throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("encoder",new StringEncoder());

pipeline.addLast("decoder",new StringDecoder());

pipeline.addLast(new ClientMessageHandler());

}

});

ChannelFuture channelFuture = boot.connect(ip, port).sync();

channelFuture.addListener(new GenericFutureListener>() {

@Override

public void operationComplete(Future super Void> future)

throws Exception {

if(future.isSuccess()){

System.out.println("客户端启动中...");

}

if(future.isDone()){

System.out.println("客户端启动成功...OK!");

}

}

});

System.out.println(channelFuture.channel().localAddress().toString());

System.out.println("#################################################");

System.out.println("~~~~~~~~~~~~~~端口号#消息内容~~这样可以给单独一个用户发消息~~~~~~~~~~~~~~~~~~");

System.out.println("#################################################");

/**

* 这里用控制台输入数据

*/

Channel channel = channelFuture.channel();

//获取channel

Scanner scanner = new Scanner(System.in);

while(scanner.hasNextLine()){

String str = scanner.nextLine();

channel.writeAndFlush(str+"\n");

}

channelFuture.channel().closeFuture().sync();

scanner.close();

} finally {

work.shutdownGracefully();

}

}

/**

* 主方法入口

* @param args

* @throws Exception

*/

public static void main(String[] args) throws Exception{

new NettyChatClient("127.0.0.1",9090).init();

}

}

客户端消息处理Handler

package nio.test.netty.groupChat;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

/**

* 客户点消息处理 Handler

* @author zhang

*

*/

public class ClientMessageHandler extends SimpleChannelInboundHandler {

/**

* 处理收到的消息

*/

@Override

protected void channelRead0(ChannelHandlerContext ctx, String msg)

throws Exception {

System.out.println(msg);

}

/**

* 连接异常后触发

*/

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

throws Exception {

ctx.close();

}

}

测试结果

启动了四个客户端 服务器端日志效果如下:

客户端一端日志:

客户端二日志:

客户端三日志:

客户端四日志:

现在在客户端四发送消息:

每个客户端都可以收到消息:

软化关闭客户端客户端三:

服务器日志:

其他客户端日志:

发送私聊消息:

这个客户端收不到消息

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:f搜 (f搜引擎官网)(22秒之前已更新)
下一篇:java获取linux服务器上的IP操作
相关文章

 发表评论

暂时没有评论,来抢沙发吧~