CXYVIP官网源码交易平台_网站源码_商城源码_小程序源码平台-丞旭猿论坛
CXYVIP官网源码交易平台_网站源码_商城源码_小程序源码平台-丞旭猿论坛
CXYVIP官网源码交易平台_网站源码_商城源码_小程序源码平台-丞旭猿论坛

Java,Netty,实现HTTP服务器案例,实现简单的TCP通信案例-源码交易平台丞旭猿

Netty实现HTTP服务器

异步事件驱动,网络应用程序框架,快速开发可维护的高性能协议服务器和客户端。

Pipeline处理链,由一系列ChannelInboundHandler和ChannelOutboundHandler串联组成,ChannelInboundHandler是用来Inbound事件的处理程序,ChannelOutboundHandler是Outbound事件的处理程序。

HTTP请求及常见的Content-Type类型

1、application/x-www-form-urlencoded, POST提交数据的方式,原生Form表单,如果不设置enctype属性的默认方式;

2、multipart/form-data,POST提交数据的方式,Form表单的enctype设置为multipart/form-data,表单的数据处理为一条消息,以标签为单元,用分隔符(这就是boundary的作用)分开,这种方式将数据有很多部分,它既可以上传键值对,也可以上传文件,甚至多个文件。

3、application/json,JSON格式。

4、binary (application/octet-stream),只能提交二进制。

实现HTTP服务器案例

<dependency><groupId>io.nettygroupId><artifactId>netty-allartifactId><version>4.1.60.Finalversion>dependency><dependency><groupId>com.google.protobufgroupId><artifactId>protobuf-javaartifactId><version>3.15.6version>dependency>
packagecom.what21.netty.http;importio.netty.bootstrap.ServerBootstrap;importio.netty.channel.ChannelFuture;importio.netty.channel.EventLoopGroup;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.nio.NioServerSocketChannel;publicclassNettyHttpServer{publicstaticvoidmain(String[] args){// 访问地址:http://127.0.0.1:8080/intserverPort =8080;// 初始化==>用于Acceptor的主"线程池"EventLoopGroup bossEventGroup =newNioEventLoopGroup();// 初始化==>用于I/O工作的从"线程池"NioEventLoopGroup workerEventGroup =newNioEventLoopGroup();try{
            ServerBootstrap serverBootstrap =newServerBootstrap();// group方法设置主从线程池serverBootstrap.group(bossEventGroup, workerEventGroup)// 指定通道channel类型,服务端为:NioServerSocketChannel.channel(NioServerSocketChannel.class)
                    // 添加Handler.childHandler(newNettyHttpServerInitializer());// 绑定并侦听端口ChannelFuture channelFuture = serverBootstrap.bind(serverPort).sync();// 等待服务监听端口关闭channelFuture.channel().closeFuture().sync();
        }catch(InterruptedException e) {
            e.printStackTrace();
        }finally{// 优雅退出,释放"线程池"bossEventGroup.shutdownGracefully();
            workerEventGroup.shutdownGracefully();
        }
    }
}
packagecom.what21.netty.http;importio.netty.channel.ChannelInitializer;importio.netty.channel.ChannelPipeline;importio.netty.channel.socket.SocketChannel;importio.netty.handler.codec.http.HttpServerCodec;/**
 * netty 实现简单的 http 协议:配置 解码器、handler
 */publicclassNettyHttpServerInitializerextendsChannelInitializer<SocketChannel>{@OverrideprotectedvoidinitChannel(SocketChannel socketChannel)throwsException{
        ChannelPipeline pipeline = socketChannel.pipeline();// 管道中添加 netty 提供的 http 编解码器// HttpServerCodec==>http编解码的处理类pipeline.addLast("HttpServerCodec",newHttpServerCodec());
        pipeline.addLast("MyNettyHttpServerHandler",newNettyHttpServerHandler());
    }

}
package com.what21.netty.http;importio.netty.buffer.ByteBuf;importio.netty.buffer.Unpooled;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.SimpleChannelInboundHandler;importio.netty.handler.codec.http.*;importio.netty.util.CharsetUtil;importjava.net.URI;/**
 * netty 实现简单的 http:handler
 */publicclassNettyHttpServerHandlerextendsSimpleChannelInboundHandler<HttpObject>{/**
     * 触发读取事件
     *
     * @param channelHandlerContext 通道上下文
     * @param httpObject            http数据
     * @throws Exception 异常
     */@Overrideprotected void channelRead0(ChannelHandlerContextchannelHandlerContext,HttpObjecthttpObject)throwsException{if(httpObject instanceofHttpRequest) {System.out.println("[服务端] 数据类型: "+ httpObject.getClass());System.out.println("[服务端] 客户端地址: "+ channelHandlerContext.channel().remoteAddress());HttpRequesthttpRequest = (HttpRequest) httpObject;finalURIuri = newURI(httpRequest.uri());if("/favico.ico".equals(uri.getPath())) {System.out.println("[服务端]请求了 favico.ico,不做处理 ");return;
            }ByteBufbyteBuf =Unpooled.copiedBuffer("[http helloworld]",CharsetUtil.UTF_8);FullHttpResponseresponse = newDefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK, byteBuf);
            response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain;charset=utf-8");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());

            channelHandlerContext.writeAndFlush(response);
        }
    }

}

实现简单的TCP通信案例

服务器端:

packagecom.what21.netty.channel.demo;importio.netty.bootstrap.ServerBootstrap;importio.netty.channel.ChannelFuture;importio.netty.channel.ChannelInitializer;importio.netty.channel.ChannelOption;importio.netty.channel.EventLoopGroup;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioServerSocketChannel;publicclassNettyTcpServer{publicstaticvoidmain(String[] args){// 初始化==>用于Acceptor的主"线程池"EventLoopGroup bossGroup =newNioEventLoopGroup();// 初始化==>用于I/O工作的从"线程池"NioEventLoopGroup workerGroup =newNioEventLoopGroup();try{
            ServerBootstrap serverBootstrap =newServerBootstrap();// group方法设置主从线程池serverBootstrap.group(bossGroup, workerGroup)// 使用 NioServerSocketChannel作为服务器的通道实现.channel(NioServerSocketChannel.class)
                    // 设置 线程队列 等待 连接的 个数
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // 设置保持 活动/生存的 连接状态
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
                    // 设置管道工厂
                    .childHandler(newChannelInitializer<SocketChannel>(){@OverrideprotectedvoidinitChannel(SocketChannel socketChannel)throwsException{
                            socketChannel.pipeline().addLast(newNettyTcpServerHandler());
                        }
                    });
            System.out.println("[服务器]启动成功");// 绑定并侦听端口ChannelFuture channelFuture = serverBootstrap.bind(6688).sync();// 等待服务监听端口关闭channelFuture.channel().closeFuture().sync();
        }catch(InterruptedException e) {
            e.printStackTrace();
        }finally{// 优雅退出,释放"线程池"bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

}
packagecom.what21.netty.channel.demo;importio.netty.buffer.ByteBuf;importio.netty.buffer.Unpooled;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.ChannelInboundHandler;importio.netty.util.CharsetUtil;importlombok.extern.slf4j.Slf4j;importjava.util.concurrent.TimeUnit;@Slf4jpublicclassNettyTcpServerHandlerimplementsChannelInboundHandler{@OverridepublicvoidchannelRegistered(ChannelHandlerContext ctx)throwsException{
        System.out.println("channelRegistered()"+ ctx);
    }@OverridepublicvoidchannelUnregistered(ChannelHandlerContext ctx)throwsException{
        System.out.println("channelUnregistered()"+ ctx);
    }/**
     * 客户端发起连接,服务端通道就绪,触发本方法
     *
     *@paramctx 通道
     *@throwsException 异常
     */@OverridepublicvoidchannelActive(ChannelHandlerContext ctx)throwsException{
        System.out.println("channelActive()"+ ctx);
        System.out.println("[服务端]通道连接准备就绪");
    }@OverridepublicvoidchannelInactive(ChannelHandlerContext var1)throwsException{
        System.out.println("channelInactive()"+ var1);
    }/**
     * 客户端连接后发送数据,服务端接收数据时触发
     *
     *@paramctx 通道
     *@parammsg 数据
     *@throwsException 异常
     */@OverridepublicvoidchannelRead(ChannelHandlerContext ctx, Object msg)throwsException{
        System.out.println("channelRead()"+ ctx);// 用户 自定义 普通任务ctx.channel().eventLoop().execute(() -> {try{
                TimeUnit.SECONDS.sleep(2);
                ctx.writeAndFlush(Unpooled.copiedBuffer("[服务端]客户端处理耗时阻塞", CharsetUtil.UTF_8));
            }catch(InterruptedException e) {
                e.printStackTrace();
            }
        });// 自定义 定时任务 提交到 ScheduledTaskQueue队列中。ctx.channel().eventLoop().schedule(() -> {try{
                TimeUnit.SECONDS.sleep(2);
                ctx.writeAndFlush(Unpooled.copiedBuffer("[服务端]客户端定时任务", CharsetUtil.UTF_8));
            }catch(InterruptedException e) {
                e.printStackTrace();
            }
        },5, TimeUnit.SECONDS);// 读取 客户端上传的 信息System.out.println("[服务端]服务端读取线程:"+ Thread.currentThread().getName());
        ByteBuf byteBuffer = (ByteBuf) msg;
        String message = byteBuffer.toString(CharsetUtil.UTF_8);
        System.out.println("[服务端]服务端接收数据:"+ message);
    }/**
     * 服务端数据读取结束后触发
     *
     *@paramctx 通道
     *@throwsException 异常
     */@OverridepublicvoidchannelReadComplete(ChannelHandlerContext ctx)throwsException{
        System.out.println("channelReadComplete()"+ ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("[服务端]数据接收完毕", CharsetUtil.UTF_8));
    }@OverridepublicvoiduserEventTriggered(ChannelHandlerContext ctx, Object msg)throwsException{
        System.out.println("userEventTriggered()"+ ctx);
    }@OverridepublicvoidchannelWritabilityChanged(ChannelHandlerContext ctx)throwsException{
        System.out.println("channelWritabilityChanged()"+ ctx);
    }@OverridepublicvoidhandlerAdded(ChannelHandlerContext ctx)throwsException{
        System.out.println("handlerAdded()"+ ctx);
    }@OverridepublicvoidhandlerRemoved(ChannelHandlerContext ctx)throwsException{
        System.out.println("handlerRemoved()"+ ctx);
    }/**
     * 发生异常,关闭通道
     *
     *@paramctx   通道
     *@paramcause 原因
     *@throwsException 异常
     */@OverridepublicvoidexceptionCaught(ChannelHandlerContext ctx, Throwable cause)throwsException{
        ctx.close();
    }

}

客户端实现:

packagecom.what21.netty.channel.demo;importio.netty.bootstrap.Bootstrap;importio.netty.channel.ChannelFuture;importio.netty.channel.ChannelInitializer;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioSocketChannel;/**
 * netty 实现 tcp服务
 */publicclassNettyTcpClient{publicstaticvoidmain(String[] args){// 初始化==>用于I/O工作的从"线程池"finalNioEventLoopGroup eventLoopGroup =newNioEventLoopGroup();try{
            Bootstrap bootstrap =newBootstrap();// group方法设置线程池bootstrap.group(eventLoopGroup)// 设置客户端 通道 是 NioSocketChannel 类型.channel(NioSocketChannel.class)
                    // 设置管道工厂
                    .handler(newChannelInitializer<SocketChannel>(){@OverrideprotectedvoidinitChannel(SocketChannel socketChannel)throwsException{
                            socketChannel.pipeline().addLast(newNettyTcpClientHandler());
                        }
                    });
            System.out.println("[客户端]启动成功");// 连接ChannelFuture channelFuture = bootstrap.connect("localhost",6688);
            channelFuture.channel().closeFuture().sync();
        }catch(InterruptedException e) {
            e.printStackTrace();
        }finally{// 优雅退出,释放"线程池"eventLoopGroup.shutdownGracefully();
        }

    }

}
packagecom.what21.netty.channel.demo;importio.netty.buffer.ByteBuf;importio.netty.buffer.Unpooled;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.ChannelInboundHandler;importio.netty.util.CharsetUtil;importlombok.extern.slf4j.Slf4j;@Slf4jpublicclassNettyTcpClientHandlerimplementsChannelInboundHandler{@OverridepublicvoidchannelRegistered(ChannelHandlerContext ctx)throwsException{
        System.out.println("channelRegistered()"+ ctx);
    }@OverridepublicvoidchannelUnregistered(ChannelHandlerContext ctx)throwsException{
        System.out.println("channelUnregistered()"+ ctx);
    }@OverridepublicvoidchannelActive(ChannelHandlerContext ctx)throwsException{
        System.out.println("channelActive()"+ ctx);
        System.out.println("[客户端]通道就绪");
        ctx.writeAndFlush(Unpooled.copiedBuffer("[这儿有内鬼,终止交易]", CharsetUtil.UTF_8));
    }@OverridepublicvoidchannelInactive(ChannelHandlerContext ctx)throwsException{
        System.out.println("channelInactive()"+ ctx);
    }/**
     *@paramctx 通道
     *@parammsg 数据
     *@throwsException 异常
     */@OverridepublicvoidchannelRead(ChannelHandlerContext ctx, Object msg)throwsException{
        System.out.println("channelRead()"+ ctx);
        ByteBuf byteBuf = (ByteBuf) msg;
        String message = byteBuf.toString(CharsetUtil.UTF_8);
        System.out.println("[客户端]服务端地址:"+ ctx.channel().remoteAddress() +",服务端回复信息:"+ message);
    }@OverridepublicvoidchannelReadComplete(ChannelHandlerContext ctx)throwsException{
        System.out.println("channelReadComplete()"+ ctx);
    }@OverridepublicvoiduserEventTriggered(ChannelHandlerContext ctx, Object msg)throwsException{
        System.out.println("userEventTriggered()"+ ctx);
    }@OverridepublicvoidchannelWritabilityChanged(ChannelHandlerContext ctx)throwsException{
        System.out.println("userEventTriggered()"+ ctx);
    }@OverridepublicvoidhandlerAdded(ChannelHandlerContext ctx)throwsException{
        System.out.println("handlerAdded()"+ ctx);
    }@OverridepublicvoidhandlerRemoved(ChannelHandlerContext ctx)throwsException{
        System.out.println("handlerRemoved()"+ ctx);
    }/**
     * 发生异常,关闭通道
     *
     *@paramctx   通道
     *@paramcause 原因
     *@throwsException 异常
     */@OverridepublicvoidexceptionCaught(ChannelHandlerContext ctx, Throwable cause)throwsException{
        System.out.println("exceptionCaught()"+ ctx);
        cause.printStackTrace();
        ctx.close();
    }

}

声明:本文部分素材转载自互联网,如有侵权立即删除 。

© 版权声明
THE END
喜欢就支持一下吧
点赞0赞赏 分享
相关推荐
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容