目录
Netty实现HTTP服务器
异步事件驱动,网络应用程序框架,快速开发可维护的高性能协议服务器和客户端。
Pipeline处理链,由一系列ChannelInboundHandler和ChannelOutboundHandler串联组成,ChannelInboundHandler是用来Inbound事件的处理程序,ChannelOutboundHandler是Outbound事件的处理程序。
HTTP请求及常见的Content-Type类型
1、application/x-www-form-urlencoded, POST提交数据的方式,原生Form表单,如果不设置enctype属性的默认方式;
2、multipart/form-data,POST提交数据的方式,Form表单的enctype设置为multipart/form-data,表单的数据处理为一条消息,以标签为单元,用分隔符(这就是boundary的作用)分开,这种方式将数据有很多部分,它既可以上传键值对,也可以上传文件,甚至多个文件。
3、application/json,JSON格式。
4、binary (application/octet-stream),只能提交二进制。
实现HTTP服务器案例
<!-- netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.60.Final</version> </dependency> <!-- protobuf-java --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.15.6</version> </dependency>package com.what21.netty.http; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class NettyHttpServer { public static void main(String[] args) { // 访问地址::8080/ int serverPort = 8080; // 初始化==>用于Acceptor的主"线程池" EventLoopGroup bossEventGroup = new NioEventLoopGroup(); // 初始化==>用于I/O工作的从"线程池" NioEventLoopGroup workerEventGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); // group方法设置主从线程池 serverBootstrap.group(bossEventGroup, workerEventGroup) // 指定通道channel类型,服务端为:NioServerSocketChannel .channel(NioServerSocketChannel.class) // 添加 Handler .childHandler(new NettyHttpServerInitializer()); // 绑定并侦听端口 ChannelFuture channelFuture = serverBootstrap.bind(serverPort).sync(); // 等待服务监听端口关闭 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 优雅退出,释放"线程池" bossEventGroup.shutdownGracefully(); workerEventGroup.shutdownGracefully(); } } }package com.what21.netty.http; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpServerCodec; /** * netty 实现简单的 http 协议:配置 解码器、handler */ public class NettyHttpServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 管道中添加 netty 提供的 http 编解码器 // HttpServerCodec==>http编解码的处理类 pipeline.addLast("HttpServerCodec", new HttpServerCodec()); pipeline.addLast("MyNettyHttpServerHandler", new NettyHttpServerHandler()); } }package com.what21.netty.http; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil; import java.net.URI; /** * netty 实现简单的 http:handler */ public class NettyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> { /** * 触发读取事件 * * @param channelHandlerContext 通道上下文 * @param httpObjecthttp数据 * @throws Exception 异常 */ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception { if (httpObject instanceof HttpRequest) { System.out.println("[服务端] 数据类型: " + httpObject.getClass()); System.out.println("[服务端] 客户端地址: " + channelHandlerContext.channel().remoteAddress()); HttpRequest httpRequest = (HttpRequest) httpObject; final URI uri = new URI(httpRequest.uri()); if ("/favico.ico".equals(uri.getPath())) { System.out.println("[服务端]请求了 favico.ico,不做处理 "); return; } ByteBuf byteBuf = Unpooled.copiedBuffer("[http helloworld]", CharsetUtil.UTF_8); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes()); channelHandlerContext.writeAndFlush(response); } } }实现简单的TCP通信案例
服务器端:
package com.what21.netty.channel.demo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class NettyTcpServer { public static void main(String[] args) { // 初始化==>用于Acceptor的主"线程池" EventLoopGroup bossGroup = new NioEventLoopGroup(); // 初始化==>用于I/O工作的从"线程池" NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); // group方法设置主从线程池 serverBootstrap.group(bossGroup, workerGroup) // 使用 NioServerSocketChannel作为服务器的通道实现 .channel(NioServerSocketChannel.class) // 设置 线程队列 等待 连接的 个数 .option(ChannelOption.SO_BACKLOG, 128) // 设置保持 活动/生存的 连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置管道工厂 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyTcpServerHandler()); } }); System.out.println("[服务器]启动成功"); // 绑定并侦听端口 ChannelFuture channelFuture = serverBootstrap.bind(6688).sync(); // 等待服务监听端口关闭 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 优雅退出,释放"线程池" bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }package com.what21.netty.channel.demo; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.TimeUnit; @Slf4j public class NettyTcpServerHandler implements ChannelInboundHandler { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelRegistered()" + ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelUnregistered()" + ctx); } /** * 客户端发起连接,服务端通道就绪,触发本方法 * * @param ctx 通道 * @throws Exception 异常 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive()" + ctx); System.out.println("[服务端]通道连接准备就绪"); } @Override public void channelInactive(ChannelHandlerContext var1) throws Exception { System.out.println("channelInactive()" + var1); } /** * 客户端连接后发送数据,服务端接收数据时触发 * * @param ctx 通道 * @param msg 数据 * @throws Exception 异常 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channelRead()" + ctx); // 用户 自定义 普通任务 ctx.channel().eventLoop().execute(() -> { try { TimeUnit.SECONDS.sleep(2); ctx.writeAndFlush(Unpooled.copiedBuffer("[服务端]客户端处理耗时阻塞", CharsetUtil.UTF_8)); } catch (InterruptedException e) { e.printStackTrace(); } }); // 自定义 定时任务 提交到 ScheduledTaskQueue队列中。 ctx.channel().eventLoop().schedule(() -> { try { TimeUnit.SECONDS.sleep(2); ctx.writeAndFlush(Unpooled.copiedBuffer("[服务端]客户端定时任务", CharsetUtil.UTF_8)); } catch (InterruptedException e) { e.printStackTrace(); } }, 5, TimeUnit.SECONDS); // 读取 客户端上传的 信息 System.out.println("[服务端]服务端读取线程:" + Thread.currentThread().getName()); ByteBuf byteBuffer = (ByteBuf) msg; String message = byteBuffer.toString(CharsetUtil.UTF_8); System.out.println("[服务端]服务端接收数据:" + message); } /** * 服务端数据读取结束后触发 * * @param ctx 通道 * @throws Exception 异常 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channelReadComplete()" + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("[服务端]数据接收完毕", CharsetUtil.UTF_8)); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("userEventTriggered()" + ctx); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { System.out.println("channelWritabilityChanged()" + ctx); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerAdded()" + ctx); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerRemoved()" + ctx); } /** * 发生异常,关闭通道 * * @param ctx 通道 * @param cause 原因 * @throws Exception 异常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }客户端实现:
package com.what21.netty.channel.demo; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * netty 实现 tcp服务 */ public class NettyTcpClient { public static void main(String[] args) { // 初始化==>用于I/O工作的从"线程池" final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); // group方法设置线程池 bootstrap.group(eventLoopGroup) // 设置客户端 通道 是 NioSocketChannel 类型 .channel(NioSocketChannel.class) // 设置管道工厂 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyTcpClientHandler()); } }); System.out.println("[客户端]启动成功"); // 连接 ChannelFuture channelFuture = bootstrap.connect("localhost", 6688); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 优雅退出,释放"线程池" eventLoopGroup.shutdownGracefully(); } } } package com.what21.netty.channel.demo; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; @Slf4j public class NettyTcpClientHandler implements ChannelInboundHandler { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelRegistered()" + ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelUnregistered()" + ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive()" + ctx); System.out.println("[客户端]通道就绪"); ctx.writeAndFlush(Unpooled.copiedBuffer("[这儿有内鬼,终止交易]", CharsetUtil.UTF_8)); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelInactive()" + ctx); } /** * @param ctx 通道 * @param msg 数据 * @throws Exception 异常 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channelRead()" + ctx); ByteBuf byteBuf = (ByteBuf) msg; String message = byteBuf.toString(CharsetUtil.UTF_8); System.out.println("[客户端]服务端地址:" + ctx.channel().remoteAddress() + ",服务端回复信息:" + message); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channelReadComplete()" + ctx); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("userEventTriggered()" + ctx); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { System.out.println("userEventTriggered()" + ctx); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerAdded()" + ctx); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerRemoved()" + ctx); } /** * 发生异常,关闭通道 * * @param ctx 通道 * @param cause 原因 * @throws Exception 异常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("exceptionCaught()" + ctx); cause.printStackTrace(); ctx.close(); } }