netty
# BIO 同步阻塞bio:链接数目较少 !\[image-20200922101237506\](https://i.loli.net/2020/09/22/tiBlm8WUurfb4sP.png) \`\`\`java public static void main(String args\[\]) throws IOException { ExecutorService pool = ThreadPool.getCachedThreadPool(); ServerSocket socket = new ServerSocket(6666); System.out.println("服务器启动...."); while(true){ final Socket client = socket.accept(); System.out.println("this is a client"); pool.execute(new Runnable() { @Override public void run() { handler(client); } }); } } public static void handler(Socket socket){ System.out.print(String.format("线程id: %s 线程name: %s",Thread.currentThread().getId(),Thread.currentThread().getName())); byte\[\] bytes = new byte\[1024\]; try (InputStream inputStream = socket.getInputStream()) { while(true){ int read = inputStream.read(bytes); if(read!=-1){ System.out.print(new String(bytes,0,read)); }else { break; } } } catch (IOException e) { e.printStackTrace(); }finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } \`\`\` # NIO - 同步非阻塞nio:链接数目较多且链接比较短,聊天,弹幕,服务器间通讯 - 三大组件: - channel, - 每个channel对应一个buffer - 双向,可以返回os底层的情况 - file channel - dataprogram channel - file channel - buffer, - 内存块,底层是数组 - 数据读取的位置,与bio不同,不使用流,但也可以双向读取(filp方法:反转) - 重要属性 - Capacity:容量大小,不可改变 - Limit:极限位置标记,可变,默认就是Capacity - mark:标记 默认-1 - Position:下一个要被读或写的位置索引 buffer的读写都依赖这四个属性, - Selector - 对应一个线程 - 对应多个channel \| \| \| \| ------------------------------------------------------------ \| ------------------------------------------------------------ \| \| !\[image-20200922110437831\](https://i.loli.net/2020/09/22/SKMy5eONbIkufXj.png) \| !\[image-20200922111521881\](https://i.loli.net/2020/09/22/LbhtungkqReaNzY.png) \| \`\`\`java String str = "hello world"; try (FileOutputStream stream = new FileOutputStream("d:/desktop/1.txt")) { FileChannel channel = stream.getChannel(); ByteBuffer buffer1 = ByteBuffer.allocate(1024); buffer1.put(str.getBytes()); buffer1.flip(); channel.write(buffer1); stream.close(); } catch (IOException e) { e.printStackTrace(); } \`\`\` !\[image-20200922120031552\](https://i.loli.net/2020/09/22/RV1wMjdHOJUL9Ef.png) 文件复制:同一个buffer !\[image-20200922120221883\](https://i.loli.net/2020/09/22/kpjWzmaU3uQ8nLE.png) 文件复制:使用通道的transform(),或transto()方法,更方便 文件直接修改,文件内容不再进入jvm,buffer直接映射到文件 !\[image-20200922122102802\](https://i.loli.net/2020/09/22/ldL245tx6wBimpf.png) 通信示例 \`\`\`java public static void server() throws Exception{ // 创建ServerSocketChannel ---》ServerSocket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); InetSocketAddress socketAddress = new InetSocketAddress(7000); serverSocketChannel.socket().bind(socketAddress); serverSocketChannel.configureBlocking(false); // 创建Selector Selector selector = Selector.open(); // serverSocketChannel注册到seletcor,事件为OP------ACCEPT serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while(true){ // 没有事件 if(selector.select(1000) == 0){ // selector 等待1秒钟 System.out.println("waiting for 1 seconds"); continue; } // 有事件 Set selectionKeys = selector.selectedKeys(); Iterator keyIterator = selectionKeys.iterator(); while(keyIterator.hasNext()){ // 获取key SelectionKey key = keyIterator.next(); // 获取对应通道 if(key.isAcceptable()){ // 生成socketChannel SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); // socketChannel注册到selector,关注事件为OP_read socketChannel.register(selector,SelectionKey.OP_READ,ByteBuffer.allocate(1024)); } if (key.isReadable()){ // 反向获取到channel SocketChannel channel = (SocketChannel) key.channel(); // 会获取到buffer ByteBuffer buffer = (ByteBuffer) key.attachment(); channel.read(buffer); System.out.println(String.format("当前线程:%s-%s,form客户端:%s",Thread.currentThread().getId(),Thread.currentThread().getName(),new String(buffer.array()))); } // 删除selectionkey,防止重复操作 keyIterator.remove(); } } } public static void client() throws Exception{ SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); // 未连接上或正在连接 if(!socketChannel.connect(new InetSocketAddress("127.0.0.1",7000))){ while(!socketChannel.finishConnect()){ System.out.println("正在连接,但客户端没有阻塞"); } } // 已连接 String str = "hello world"; ByteBuffer buffer = ByteBuffer.wrap(str.getBytes()); // 发送数据 socketChannel.write(buffer); } \`\`\` !\[image-20200922140108426\](https://i.loli.net/2020/09/22/q7QVlrYM4sRdLpH.png) !\[image-20200922101211665\](https://i.loli.net/2020/09/22/98RJtx1GgWaFvM2.png) # 零拷贝:没有cpu copy或copy 的信息很少 示例:服务器读取文件,发送给客户 - 传统:用户---》内核---》用户---》内核,三次状态切换,四次copy - 硬件--》内核:dma copy - 用户态--》内核态 - 内核---》用户:cpu copy - 内核态--》用户态 - 用户---》socket: cpu copy - 用户态---》内核态 - socket---》硬件协议:dma copy - mmap:内存映射,将用户内存映射到内核 - 还是3次切换,但只有3次copy - 硬件--》内核:dma copy - 用户态--》内核态 - 此时内核数据用户共享,但此时程序获取数据会进行状态切换,所以还是3次状态切换 - 内核---》socket: cpu copy - 用户态---》内核态 - socket---》硬件协议:dma copy sendFile:2次切换,3次copy linux2.1 - 硬件--》内核:dma copy - 用户态--》内核态 - 内核---》socket:cup copy - 内核态---》用户态 - socket---》硬件协议:dma copy sendFile:2次切换,2次copy linux2.4 - 硬件--》内核:dma copy - 用户态--》内核态 - 内核态---》用户态 - 内核(socket)---》硬件协议:dma copy # AIO 异步非阻塞aio:链接数目多,并且链接时间长 !\[image-20200922153711739\](https://i.loli.net/2020/09/22/H9Ysx2LwJRvSl7a.png) # Netty - 用于基于nio的数据传输(大数据,小数据都可)框架 - 自己用nio写,还是比较偏底层的,比较麻烦 - 简化nio的开发流程 - tcp/upd(传输协议)----》nio(基于传输协议的api),netty------》用用协议(http,websocket,ssl等等) ###### 传统网络I/O服务模型 !\[image-20200922155132004\](https://i.loli.net/2020/09/22/SVZOUt41T6PDJWE.png) ###### REACTOR模型 - 基于I/O多路复用 - 基于线程池线程复用 !\[image-20200922160119610\](https://i.loli.net/2020/09/22/yTPZYd3capDq92k.png) 单Rector单线程 !\[image-20200922160638095\](https://i.loli.net/2020/09/22/EurIJjyqRVSHBA1.png) 单Rector多线程 !\[image-20200922160845862\](https://i.loli.net/2020/09/22/Usga2mBWzxHkAKY.png) ###### 主从Reactor !\[image-20200922162138699\](https://i.loli.net/2020/09/22/At4rRkvigUOPF8C.png) ###### Netty - BossGroup,负责接收客户端连接 - WorkGroup,负责读写 - NioEventLoopGroup,事件循环组 - NioEventLoop,不断循环的处理任务线程,有一个selector - boss nioeventloop - 轮训accept事件 - 处理accept事件,与client建立连接, - 生成NioSocketChannel,并注册到某个work的NIOEventLoop的selector - 处理任务队列的任务 - work nioeventloop - 轮训read,write事件 - 处理i/o事件,即read,write,在NiosocketChannel处理 - 处理队列任务 - work nioeventloop处理数据会使用pipline - pipline包含channel \`\`\`java // netty 聊天 public class Server{ public static void main(String\[\] args){ // 连个线程组, // 处理链接,默认线程数=cup核数\*2 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 处理业务 EventLoopGroup workGroup = new NioEventLoopGroup(); // 服务端启动对象 ServerBootstrap bootstrap = new ServerBootstrap(); try { bootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) // 通道类型 .option(ChannelOption.SO_BACKLOG, 128) // 线程队列链接数 .childOption(ChannelOption.SO_KEEPALIVE, true) //保持活动链接状态 .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyServerHandler()); } }); // 给workgroup的eventloop设置处理器 // 绑定端口,并启动 ChannelFuture cf = bootstrap.bind("127.0.0.1",6668).sync(); System.out.println("服务端启动...."); // 关闭,当有关闭事件时关闭 cf.channel().closeFuture().sync(); }catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } } class NettyServerHandler extends ChannelInboundHandlerAdapter { // 读取客户端消息 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(ctx.channel().remoteAddress()); System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端", CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } \`\`\` \`\`\`java public class Client { public static void main(String\[\] args) { // 事件循环组 EventLoopGroup eventExecutors = new NioEventLoopGroup(); // 客户端对象 Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(eventExecutors) // 设置线程组 .channel(NioSocketChannel.class) // 设置通道实现类 .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyClientHandler()); // 加入处理器 } }); // 启动 ChannelFuture cf = bootstrap.connect("127.0.0.1", 6668).sync(); cf.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { eventExecutors.shutdownGracefully(); } } } class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("ctx"+ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("客户端发来的信息",CharsetUtil.UTF_8)); } // 读取客户端消息 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(ctx.channel().remoteAddress()); System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { super.channelReadComplete(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } \`\`\` # Netty Task - 用户自定义普通任务 - 用户自定义定时任务 - 非Reactor调用channel:服务器推送信息到客户端 \`\`\`java // 用户自定义的普通任务 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(ctx.channel().remoteAddress()); System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8)); // 若此时是耗时任务,客户端都要等待服务器执行完毕, // 通过下面方式可以异步执行,就是将任务交给了netty的task ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { System.out.println(ctx.channel().remoteAddress()); System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8)); } }); // 任务放在scheduleTaskQueue ctx.channel().eventLoop().schedule(new Runnable() { @Override public void run() { System.out.println(ctx.channel().remoteAddress()); System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8)); } },5, TimeUnit.SECONDS); } \`\`\` # Netty实现Http协议 \`\`\`java public class Main{ public static void main(String\[\] args){ // 连个线程组, // 处理链,默认线程数=cup核数\*2 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 处理业务 EventLoopGroup workGroup = new NioEventLoopGroup(); // 服务端启动对象 ServerBootstrap bootstrap = new ServerBootstrap(); try { bootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) // 通道类型 .option(ChannelOption.SO_BACKLOG, 128) // 线程队列链接数 .childOption(ChannelOption.SO_KEEPALIVE, true) //保持活动链接状态 .childHandler(new ChannelInitializer() {// 给workgroup添加handler @Override protected void initChannel(SocketChannel ch) throws Exception {// 每个请求pipline与handler是不共享的, // 得到管道 ChannelPipeline pipeline = ch.pipeline(); // http 解码器 pipeline.addLast(new HttpServerCodec()); // 响应 pipeline.addLast(new NettyServerHandler()); } }); // 给workgroup的eventloop设置处理器 // 绑定端口,并启动 ChannelFuture cf = bootstrap.bind("127.0.0.1",8080).sync(); System.out.println("服务端启动...."); // 关闭,当有关闭事件时关闭 cf.channel().closeFuture().sync(); }catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } } // SimpleChannelInboundHandler是ChannelInboundHandlerAdapter的子类 // 通信数据被封装httpobject class NettyServerHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { System.out.println(ctx.channel().pipeline().hashCode()+" "+this.hashCode()); HttpRequest request = (HttpRequest) msg; URI uri = new URI(request.uri()); if("/favicon.ico".equals(uri.getPath())){ System.out.println("不作响应"); return; } ByteBuf content = Unpooled.copiedBuffer("我是服务器,", CharsetUtil.UTF_16); DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK,content); defaultFullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain"); defaultFullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,content.readableBytes()); ctx.writeAndFlush(defaultFullHttpResponse); } } \`\`\` !\[image-20200922202021791\](https://i.loli.net/2020/09/22/yMD7cdeK2aE4FQJ.png) # Unpooled类 \`\`\`java // 该对象包含数组,读取的时候不同flip进行翻转,底层维护了readerIndex,writerIndex ByteBuf a = Unpooled.buffer(100); ByteBuf b = Unpooled.copiedBuffer("helloworld",CharsetUtil.UTF_8); \`\`\` # Netty心跳检测 \`\`\`java // 心跳检测处理器,触发器 // 3s没读取,发送检测包 // 5s没有写,发送检测包 // 7s没有读,也没有写,发送检测包 // 事件传递到下一个handler处理(自定义) pipline.addLast(new IdleStateHandler(3,5,7TimeUnit.SECONDS)) pipline.addLast(new ChannelInboundHandlerAdapter(){ public void userEventTriggered(ChannelHandlerContext ctx,Object evt){ if(evt instance of IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; swicth(event.state()){ case READER_IDLE: XXX case WRITER_IDLE: XXX case ALL_IDLE: XXX } } } }) \`\`\` # Websocket !\[image-20201002101425900\](https://i.loli.net/2020/10/02/ciQSzFdaNxsYkCe.png) \`\`\`java class WebSocketHandler extends SimpleChannelInboundHandler{ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { ctx.channel().writeAndFlush(new TextWebSocketFrame("xxxx")); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { super.handlerAdded(ctx); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { super.handlerRemoved(ctx); } } \`\`\` \`\`\`javascript var socket; if(windows.WebSocket){ socket = new WebSocket("ws://localhost:7000/hello"); socket.onmessage = function(ev){ 收到消息 } socket.onopen = function(ev){ 连接开启 } socket.onclose = function(ev){ 连接关闭 } } function send(message){ if(socket.readyState==WebSocket.OPEN){ socket.send(message); } } \`\`\` # 编码解码 Netty编码解码 - StringEncoder,StringDecoder - ObjectEncoder,ObjectDecoder 序列化的缺点 - 无法跨语言 - 序列化体积大 - 序列化性能低 ###### google的Protobuf - rpc数据交换 - http+json(以前) - tcp+protobuf (现在) - 跨语言,支持很多语言 - 高性能 \`\`\`protobuf syntax = "protobuf4";// 版本号, option optimize_for = SPEED; // 加快解析 option java_package = "top.deanxxx"; // 指定包名 option java_outer_classname = "MyData";//文件名,外部类名 message MyMessage{ enum DataType{ StudentType = 0; UserType = 1; } DataType data_Type = 1; // MyMessage 的第一个属性 oneof dataBody{ Stduent s = 2; // MyMessage 的另一个属性 User u = 3;// MyMessage 的另一个属性 } } message User{ int32 id = 1; // id属性 ,1 表示属性的序号 string name = 2; } message Student{ int32 id = 1; // id属性 ,1 表示属性的序号 string name = 2; } // protoc.exe --java_out=. xxx.proto \`\`\` - 编写proto文件 - 变为对对应语言 - 服务端对应语言中的proto编码器(handler),发送 - 客户端接收,对应解码器(handler) # 数据入栈出栈 !\[image-20201002134742983\](https://i.loli.net/2020/10/02/gCWvumod2BAqbU7.png) # TCP粘包与拆包 粘包:间隔时间短的包合并为一个 拆包:数据过大拆分 自定义协议+编解码器来解决 关键是解决服务器每次读取数据长度问题。 # 实现RPC !\[image-20201002144927179\](https://i.loli.net/2020/10/02/ngzsie86ZYdSMpC.png) \`\`\`JAVA // 公共接口 public interface{ String hello(String msg); } \`\`\` \`\`\`java // provider public class HelloService implement HelloService{ String hello(String msg){ } } public class ServerBootstrap{ public static void main(String args){ } } public class NettyServer{ // netty初始化 private static void startserver0(String hostname,int port){ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); try { bootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) // 通道类型 .option(ChannelOption.SO_BACKLOG, 128) // 线程队列链接数 .childOption(ChannelOption.SO_KEEPALIVE, true) //保持活动链接状态 .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new ServerHandler()); } }); ChannelFuture cf = bootstrap.bind(hostname,port).sync(); cf.channel().closeFuture().sync(); }catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } } public class ServerHandler{ channelRead: xxxxx } \`\`\` \`\`\`java public class NettyClient{ private static ExecutorService executor = Executors.newFixedThreadPool(4); private static NettyClient client; // 编写方法,代理模式 public Object getBean(final Classserviceclass,final String providerName){ return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class\[\]{serviceClass},(proxy,method,args)-\>{ // 每调用一次hello, if(cliet ==null){ initial(); } client.setPara(prividerName+args\[0\]); return executor.submit(client).get() }) } private static void initial(){ client = new NettyClient(); EventLoopGroup eventExecutors = new NioEventLoopGroup(); // 客户端对象 Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(eventExecutors) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY,true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); } }); // 启动 ChannelFuture cf = bootstrap.connect("127.0.0.1", 8080).sync(); cf.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { eventExecutors.shutdownGracefully(); } } } } public class ClientHandler extends ChannelInboundHandlerAdapter implements Callable{ private ChannelHandlerContext ctx; private String result; private para; channelActivate(ctx){ ctx = ctx } synchronzied channelRead(msg){ result = msg.toString(); notify(); } synchronized call(){ ctx.writeAndFlush(para); wait(); retutn result; } } \`\`\`