Netty实现HTTP服务器
异步事件驱动,网络应用程序框架,快速开发可维护的高性能协议服务器和客户端。
Pipeline处理链,由一系列ChannelInboundHandler和ChannelOutboundHandler串联组成,ChannelInboundHandler是用来Inbound事件的处理程序,ChannelOutboundHandler是Outbound事件的处理程序。
HTTP请求及常见的Content-Type类型
1、application/x-www-form-urlencoded, POST提交数据的方式,原生Form表单,如果不设置enctype属性的默认方式;
2、multipart/form-data,POST提交数据的方式,Form表单的enctype设置为multipart/form-data,表单的数据处理为一条消息,以标签为单元,用分隔符(这就是boundary的作用)分开,这种方式将数据有很多部分,它既可以上传键值对,也可以上传文件,甚至多个文件。
3、application/json,JSON格式。
4、binary (application/octet-stream),只能提交二进制。
实现HTTP服务器案例
<dependency><groupId>io.nettygroupId><artifactId>netty-allartifactId><version>4.1.60.Finalversion>dependency><dependency><groupId>com.google.protobufgroupId><artifactId>protobuf-javaartifactId><version>3.15.6version>dependency>
packagecom.what21.netty.http;importio.netty.bootstrap.ServerBootstrap;importio.netty.channel.ChannelFuture;importio.netty.channel.EventLoopGroup;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.nio.NioServerSocketChannel;publicclassNettyHttpServer{publicstaticvoidmain(String[] args){// 访问地址:http://127.0.0.1:8080/intserverPort =8080;// 初始化==>用于Acceptor的主"线程池"EventLoopGroup bossEventGroup =newNioEventLoopGroup();// 初始化==>用于I/O工作的从"线程池"NioEventLoopGroup workerEventGroup =newNioEventLoopGroup();try{
ServerBootstrap serverBootstrap =newServerBootstrap();// group方法设置主从线程池serverBootstrap.group(bossEventGroup, workerEventGroup)// 指定通道channel类型,服务端为:NioServerSocketChannel.channel(NioServerSocketChannel.class)
// 添加Handler.childHandler(newNettyHttpServerInitializer());// 绑定并侦听端口ChannelFuture channelFuture = serverBootstrap.bind(serverPort).sync();// 等待服务监听端口关闭channelFuture.channel().closeFuture().sync();
}catch(InterruptedException e) {
e.printStackTrace();
}finally{// 优雅退出,释放"线程池"bossEventGroup.shutdownGracefully();
workerEventGroup.shutdownGracefully();
}
}
}
packagecom.what21.netty.http;importio.netty.channel.ChannelInitializer;importio.netty.channel.ChannelPipeline;importio.netty.channel.socket.SocketChannel;importio.netty.handler.codec.http.HttpServerCodec;/**
* netty 实现简单的 http 协议:配置 解码器、handler
*/publicclassNettyHttpServerInitializerextendsChannelInitializer<SocketChannel>{@OverrideprotectedvoidinitChannel(SocketChannel socketChannel)throwsException{
ChannelPipeline pipeline = socketChannel.pipeline();// 管道中添加 netty 提供的 http 编解码器// HttpServerCodec==>http编解码的处理类pipeline.addLast("HttpServerCodec",newHttpServerCodec());
pipeline.addLast("MyNettyHttpServerHandler",newNettyHttpServerHandler());
}
}
package com.what21.netty.http;importio.netty.buffer.ByteBuf;importio.netty.buffer.Unpooled;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.SimpleChannelInboundHandler;importio.netty.handler.codec.http.*;importio.netty.util.CharsetUtil;importjava.net.URI;/**
* netty 实现简单的 http:handler
*/publicclassNettyHttpServerHandlerextendsSimpleChannelInboundHandler<HttpObject>{/**
* 触发读取事件
*
* @param channelHandlerContext 通道上下文
* @param httpObject http数据
* @throws Exception 异常
*/@Overrideprotected void channelRead0(ChannelHandlerContextchannelHandlerContext,HttpObjecthttpObject)throwsException{if(httpObject instanceofHttpRequest) {System.out.println("[服务端] 数据类型: "+ httpObject.getClass());System.out.println("[服务端] 客户端地址: "+ channelHandlerContext.channel().remoteAddress());HttpRequesthttpRequest = (HttpRequest) httpObject;finalURIuri = newURI(httpRequest.uri());if("/favico.ico".equals(uri.getPath())) {System.out.println("[服务端]请求了 favico.ico,不做处理 ");return;
}ByteBufbyteBuf =Unpooled.copiedBuffer("[http helloworld]",CharsetUtil.UTF_8);FullHttpResponseresponse = newDefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK, byteBuf);
response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain;charset=utf-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
channelHandlerContext.writeAndFlush(response);
}
}
}
实现简单的TCP通信案例
服务器端:
packagecom.what21.netty.channel.demo;importio.netty.bootstrap.ServerBootstrap;importio.netty.channel.ChannelFuture;importio.netty.channel.ChannelInitializer;importio.netty.channel.ChannelOption;importio.netty.channel.EventLoopGroup;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioServerSocketChannel;publicclassNettyTcpServer{publicstaticvoidmain(String[] args){// 初始化==>用于Acceptor的主"线程池"EventLoopGroup bossGroup =newNioEventLoopGroup();// 初始化==>用于I/O工作的从"线程池"NioEventLoopGroup workerGroup =newNioEventLoopGroup();try{
ServerBootstrap serverBootstrap =newServerBootstrap();// group方法设置主从线程池serverBootstrap.group(bossGroup, workerGroup)// 使用 NioServerSocketChannel作为服务器的通道实现.channel(NioServerSocketChannel.class)
// 设置 线程队列 等待 连接的 个数
.option(ChannelOption.SO_BACKLOG, 128)
// 设置保持 活动/生存的 连接状态
.childOption(ChannelOption.SO_KEEPALIVE,true)
// 设置管道工厂
.childHandler(newChannelInitializer<SocketChannel>(){@OverrideprotectedvoidinitChannel(SocketChannel socketChannel)throwsException{
socketChannel.pipeline().addLast(newNettyTcpServerHandler());
}
});
System.out.println("[服务器]启动成功");// 绑定并侦听端口ChannelFuture channelFuture = serverBootstrap.bind(6688).sync();// 等待服务监听端口关闭channelFuture.channel().closeFuture().sync();
}catch(InterruptedException e) {
e.printStackTrace();
}finally{// 优雅退出,释放"线程池"bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
packagecom.what21.netty.channel.demo;importio.netty.buffer.ByteBuf;importio.netty.buffer.Unpooled;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.ChannelInboundHandler;importio.netty.util.CharsetUtil;importlombok.extern.slf4j.Slf4j;importjava.util.concurrent.TimeUnit;@Slf4jpublicclassNettyTcpServerHandlerimplementsChannelInboundHandler{@OverridepublicvoidchannelRegistered(ChannelHandlerContext ctx)throwsException{
System.out.println("channelRegistered()"+ ctx);
}@OverridepublicvoidchannelUnregistered(ChannelHandlerContext ctx)throwsException{
System.out.println("channelUnregistered()"+ ctx);
}/**
* 客户端发起连接,服务端通道就绪,触发本方法
*
*@paramctx 通道
*@throwsException 异常
*/@OverridepublicvoidchannelActive(ChannelHandlerContext ctx)throwsException{
System.out.println("channelActive()"+ ctx);
System.out.println("[服务端]通道连接准备就绪");
}@OverridepublicvoidchannelInactive(ChannelHandlerContext var1)throwsException{
System.out.println("channelInactive()"+ var1);
}/**
* 客户端连接后发送数据,服务端接收数据时触发
*
*@paramctx 通道
*@parammsg 数据
*@throwsException 异常
*/@OverridepublicvoidchannelRead(ChannelHandlerContext ctx, Object msg)throwsException{
System.out.println("channelRead()"+ ctx);// 用户 自定义 普通任务ctx.channel().eventLoop().execute(() -> {try{
TimeUnit.SECONDS.sleep(2);
ctx.writeAndFlush(Unpooled.copiedBuffer("[服务端]客户端处理耗时阻塞", CharsetUtil.UTF_8));
}catch(InterruptedException e) {
e.printStackTrace();
}
});// 自定义 定时任务 提交到 ScheduledTaskQueue队列中。ctx.channel().eventLoop().schedule(() -> {try{
TimeUnit.SECONDS.sleep(2);
ctx.writeAndFlush(Unpooled.copiedBuffer("[服务端]客户端定时任务", CharsetUtil.UTF_8));
}catch(InterruptedException e) {
e.printStackTrace();
}
},5, TimeUnit.SECONDS);// 读取 客户端上传的 信息System.out.println("[服务端]服务端读取线程:"+ Thread.currentThread().getName());
ByteBuf byteBuffer = (ByteBuf) msg;
String message = byteBuffer.toString(CharsetUtil.UTF_8);
System.out.println("[服务端]服务端接收数据:"+ message);
}/**
* 服务端数据读取结束后触发
*
*@paramctx 通道
*@throwsException 异常
*/@OverridepublicvoidchannelReadComplete(ChannelHandlerContext ctx)throwsException{
System.out.println("channelReadComplete()"+ ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("[服务端]数据接收完毕", CharsetUtil.UTF_8));
}@OverridepublicvoiduserEventTriggered(ChannelHandlerContext ctx, Object msg)throwsException{
System.out.println("userEventTriggered()"+ ctx);
}@OverridepublicvoidchannelWritabilityChanged(ChannelHandlerContext ctx)throwsException{
System.out.println("channelWritabilityChanged()"+ ctx);
}@OverridepublicvoidhandlerAdded(ChannelHandlerContext ctx)throwsException{
System.out.println("handlerAdded()"+ ctx);
}@OverridepublicvoidhandlerRemoved(ChannelHandlerContext ctx)throwsException{
System.out.println("handlerRemoved()"+ ctx);
}/**
* 发生异常,关闭通道
*
*@paramctx 通道
*@paramcause 原因
*@throwsException 异常
*/@OverridepublicvoidexceptionCaught(ChannelHandlerContext ctx, Throwable cause)throwsException{
ctx.close();
}
}
客户端实现:
packagecom.what21.netty.channel.demo;importio.netty.bootstrap.Bootstrap;importio.netty.channel.ChannelFuture;importio.netty.channel.ChannelInitializer;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioSocketChannel;/**
* netty 实现 tcp服务
*/publicclassNettyTcpClient{publicstaticvoidmain(String[] args){// 初始化==>用于I/O工作的从"线程池"finalNioEventLoopGroup eventLoopGroup =newNioEventLoopGroup();try{
Bootstrap bootstrap =newBootstrap();// group方法设置线程池bootstrap.group(eventLoopGroup)// 设置客户端 通道 是 NioSocketChannel 类型.channel(NioSocketChannel.class)
// 设置管道工厂
.handler(newChannelInitializer<SocketChannel>(){@OverrideprotectedvoidinitChannel(SocketChannel socketChannel)throwsException{
socketChannel.pipeline().addLast(newNettyTcpClientHandler());
}
});
System.out.println("[客户端]启动成功");// 连接ChannelFuture channelFuture = bootstrap.connect("localhost",6688);
channelFuture.channel().closeFuture().sync();
}catch(InterruptedException e) {
e.printStackTrace();
}finally{// 优雅退出,释放"线程池"eventLoopGroup.shutdownGracefully();
}
}
}
packagecom.what21.netty.channel.demo;importio.netty.buffer.ByteBuf;importio.netty.buffer.Unpooled;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.ChannelInboundHandler;importio.netty.util.CharsetUtil;importlombok.extern.slf4j.Slf4j;@Slf4jpublicclassNettyTcpClientHandlerimplementsChannelInboundHandler{@OverridepublicvoidchannelRegistered(ChannelHandlerContext ctx)throwsException{
System.out.println("channelRegistered()"+ ctx);
}@OverridepublicvoidchannelUnregistered(ChannelHandlerContext ctx)throwsException{
System.out.println("channelUnregistered()"+ ctx);
}@OverridepublicvoidchannelActive(ChannelHandlerContext ctx)throwsException{
System.out.println("channelActive()"+ ctx);
System.out.println("[客户端]通道就绪");
ctx.writeAndFlush(Unpooled.copiedBuffer("[这儿有内鬼,终止交易]", CharsetUtil.UTF_8));
}@OverridepublicvoidchannelInactive(ChannelHandlerContext ctx)throwsException{
System.out.println("channelInactive()"+ ctx);
}/**
*@paramctx 通道
*@parammsg 数据
*@throwsException 异常
*/@OverridepublicvoidchannelRead(ChannelHandlerContext ctx, Object msg)throwsException{
System.out.println("channelRead()"+ ctx);
ByteBuf byteBuf = (ByteBuf) msg;
String message = byteBuf.toString(CharsetUtil.UTF_8);
System.out.println("[客户端]服务端地址:"+ ctx.channel().remoteAddress() +",服务端回复信息:"+ message);
}@OverridepublicvoidchannelReadComplete(ChannelHandlerContext ctx)throwsException{
System.out.println("channelReadComplete()"+ ctx);
}@OverridepublicvoiduserEventTriggered(ChannelHandlerContext ctx, Object msg)throwsException{
System.out.println("userEventTriggered()"+ ctx);
}@OverridepublicvoidchannelWritabilityChanged(ChannelHandlerContext ctx)throwsException{
System.out.println("userEventTriggered()"+ ctx);
}@OverridepublicvoidhandlerAdded(ChannelHandlerContext ctx)throwsException{
System.out.println("handlerAdded()"+ ctx);
}@OverridepublicvoidhandlerRemoved(ChannelHandlerContext ctx)throwsException{
System.out.println("handlerRemoved()"+ ctx);
}/**
* 发生异常,关闭通道
*
*@paramctx 通道
*@paramcause 原因
*@throwsException 异常
*/@OverridepublicvoidexceptionCaught(ChannelHandlerContext ctx, Throwable cause)throwsException{
System.out.println("exceptionCaught()"+ ctx);
cause.printStackTrace();
ctx.close();
}
}
声明:本文部分素材转载自互联网,如有侵权立即删除 。
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需!
7. 如遇到加密压缩包,请使用WINRAR解压,如遇到无法解压的请联系管理员!
8. 精力有限,不少源码未能详细测试(解密),不能分辨部分源码是病毒还是误报,所以没有进行任何修改,大家使用前请进行甄别
丞旭猿论坛
暂无评论内容