企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
* 非阻塞 IO * 其实无论是用 Java NIO 还是用 Netty,达到百万连接都没有任何难度。因为它们都是非阻塞的 IO,不需要为每个连接创建一个线程了。 * nio 和 io的区别 * nio的核心:Channel 通道,Buffer 缓冲区,Selector 选择器如果是百万级测试,怎么去找那么多机器?单机最多可以有 6W 的连接,百万连接起码需要17台机器! ### 如何才能突破这个限制呢? * 其实这个限制来自于网卡。 可以通过使用虚拟机,并且把虚拟机的虚拟网卡配置成了桥接模式解决了问题。 根据物理机内存大小,单个物理机起码可以跑4-5个虚拟机,所以最终百万连接只要4台物理机就够了。 ### 端口受限设置 #### windows客户端 * 打开注册表:regedit HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\ Services\TCPIP\Parameters 新建 DWORD值,name:TcpTimedWaitDelay,value:0(十进制) –> 设置为0 新建 DWORD值,name:MaxUserPort,value:65534(十进制) –> 设置最大连接数65534 重启系统 #### liunx客户端 ``` shell cat /proc/sys/net/ipv4/ip_local_port_range 值为32768 610 ``` * 大概也就是共61000-32768=28232个端口可以使用,单个IP对外只能发送28232个TCP请求。 * 以管理员身份,把端口的范围区间增到最大: ``` shell echo "1024 65535"> /proc/sys/net/ipv4/ip_local_port_range ``` 现在有64511个端口可用. * 以上做法只是临时,系统下次重启,会还原。 更为稳妥的做法是修改/etc/sysctl.conf文件,增加一行内容 ``` net.ipv4.ip_local_port_range= 1024 65535 sysctl -p ``` * 现在可以使用的端口达到64510个(假设系统所有运行的服务器是没有占用大于1024的端口的,较为纯净的centos系统可以做到),要想达到50万请求,还得再想办法。 增加IP地址 * 一般假设本机网卡名称为 eth0,那么手动再添加几个虚拟的IP: ifconfig eth0:1 192.168.190.151 ifconfig eth0:2 192.168.190.152 ...... 或者偷懒一些: for i in seq 1 9; do ifconfig eth0:$i 192.168.190.15$i up ; done 这些虚拟的IP地址,一旦重启,或者 service network restart 就会丢失。 为了模拟较为真实环境,在测试端,手动再次添加9个vmware虚拟机网卡 ### 代码 #### 入口 ``` package com.nio.test; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpRequestEncoder; import io.netty.handler.codec.http.HttpResponseDecoder; import io.netty.handler.codec.http.HttpVersion; import java.net.URI; import java.nio.charset.StandardCharsets; public class HttpClient { public void connect(String host, int port) throws Exception { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码 ch.pipeline().addLast(new HttpResponseDecoder()); // 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码 ch.pipeline().addLast(new HttpRequestEncoder()); ch.pipeline().addLast(new HttpClientInboundHandler()); } }); ChannelFuture f = b.connect(host, port).sync(); URI uri = new URI("http://gc.ditu.aliyun.com:80/geocoding?a=深圳市"); String msg = "Are you ok?"; DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes("UTF-8"))); // 构建http请求 request.headers().set(HttpHeaders.Names.HOST, host); request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, request.content().readableBytes()); // 发送http请求 f.channel().write(request); f.channel().flush(); f.channel().closeFuture().sync(); // } } finally { workerGroup.shutdownGracefully(); } } public void connect_post(String host, int port) throws Exception { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码 ch.pipeline().addLast(new HttpResponseDecoder()); // 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码 ch.pipeline().addLast(new HttpRequestEncoder()); ch.pipeline().addLast(new HttpClientInboundHandler()); } }); ChannelFuture f = b.connect(host, port).sync(); URI uri = new URI("http://gc.ditu.aliyun.com:80/geocoding?a=深圳市"); FullHttpRequest request = new DefaultFullHttpRequest( HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath()); // 构建http请求 request.headers().set(HttpHeaders.Names.HOST, host); request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); // or HttpHeaders.Values.CLOSE request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); request.headers().add(HttpHeaders.Names.CONTENT_TYPE, "application/json"); ByteBuf bbuf = Unpooled.copiedBuffer("{\"jsonrpc\":\"2.0\",\"method\":\"calc.add\",\"params\":[1,2],\"id\":1}", StandardCharsets.UTF_8); request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, bbuf.readableBytes()); // 发送http请求 f.channel().write(request); f.channel().flush(); f.channel().closeFuture().sync(); // } } finally { workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { HttpClient client = new HttpClient(); //请自行修改成服务端的IP for(int k=0; k<1;k++){ client.connect("http://gc.ditu.aliyun.com/", 80); System.out.println(k); } } } ``` #### 监听代码 ``` package com.nio.test; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpResponse; public class HttpClientInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.print("success!~!~"); if (msg instanceof HttpResponse) { HttpResponse response = (HttpResponse) msg; // System.out.println("CONTENT_TYPE:" + response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); System.out.println("HTTP_CODE:" + response.getStatus()); } if(msg instanceof HttpContent) { HttpContent content = (HttpContent)msg; ByteBuf buf = content.content(); System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8)); buf.release(); } ctx.close(); } } ``` ### 更多参考 * [netty官网](http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-10) * [端口受限](http://blog.csdn.net/educast/article/details/23053671) * [Netty 长连接服务测试](http://www.dozer.cc/2014/12/netty-long-connection.html1)