github地址:https://github/saseke/eos-netty

netty 群推(Netty实现简单群聊)(1)

Server端

EosServer.java

package com.songmengyuan.eos.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.text.SimpleDateFormat; public class EosServer { private int port; private SimpleDateFormat sdf; public EosServer(int port) { this.port = port; } public void start() throws Exception { // boss设置1个EventLoop EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup workers = new NioEventLoopGroup(); // 设置启动引导 try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, workers).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("serviceHandler", new EosServerServiceHandler()); } }); // 绑定端口 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); System.out.println("服务器启动"); channelFuture.channel().closeFuture().sync(); } finally { boss.shutdownGracefully(); workers.shutdownGracefully(); } } }

EosServerServiceHandler.java

package com.songmengyuan.eos.server; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.text.SimpleDateFormat; import java.util.Date; public class EosServerServiceHandler extends SimpleChannelInboundHandler<String> { // 用channelGroup来维护所有注册的channel private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd-hh:mm:ss"); @Override public void handlerAdded(ChannelHandlerContext ctx) { Channel channel = ctx.channel(); String msg = sdf.format(new Date()) " 用户 : " channel.remoteAddress() " 上线\n"; System.out.println(msg); // 发送给其他用户 channelGroup.writeAndFlush(msg); // 将当前的channel加入channelGroup中 channelGroup.add(channel); } // 自动会把当前的channel从group中移除 @Override public void handlerRemoved(ChannelHandlerContext ctx) { Channel channel = ctx.channel(); String msg = sdf.format(new Date()) " 用户: " channel.remoteAddress() " 下线\n"; channelGroup.writeAndFlush(msg); } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { Channel channel = ctx.channel(); String message = "用户: " channel.remoteAddress() " 发送了: " msg; System.out.println(message); // 发送给除自己之外的其他用户 channelGroup.forEach(c -> { if (c == channel) { // 如果当前channel是自己的话 String s = sdf.format(new Date()) "[自己] :" msg; c.writeAndFlush(s); } else { c.writeAndFlush(message); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }

EosServerBootstrap.java

package com.songmengyuan.eos.server; public class EosServerBootstrap { public static void main(String[] args) throws Exception { EosServer server = new EosServer(9000); server.start(); } }

Client端

EosClient.java

package com.songmengyuan.eos.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.Scanner; public class EosClient { private final String host; private final int port; public EosClient(String host, int port) { this.host = host; this.port = port; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("clientService", new EosClientServiceHandler()); } }); ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); System.out.println("登陆成功"); // 将用户输入的信息写入channel中 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.nextLine(); channelFuture.channel().writeAndFlush(msg); } } finally { group.shutdownGracefully(); } } }

EosClientServiceHandler.java

package com.songmengyuan.eos.client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class EosClientServiceHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg); } }

EosClientBootstrap.java

package com.songmengyuan.eos.client; public class EosClientBootstrap { public static void main(String[] args) throws InterruptedException { EosClient client = new EosClient("127.0.0.1",9000); client.start(); } }

,