1 模擬單機連接瓶頸
我們知道,通常啟動一個服務端會系結一個埠,例如8000埠,當然客戶端連接埠是有限制的,除去最大埠65535和默認的1024埠及以下的埠,就只剩下1 024~65 535個,再扣除一些常用埠,實際可用埠只有6萬個左右,那么,我們如何實作單機百萬連接呢?
假設在服務端啟動[8 000,8 100)這100個埠,100×6萬就可以實作600萬左右的連接,這是TCP的一個基礎知識,雖然對于客戶端來說是同一個埠號,但是對于服務端來說是不同的埠號,由于TCP是一個私源組概念,也就是說它是由源IP地址、源埠號、目的IP地址和目的埠號確定的,當源IP地址和源埠號是一樣的,但是目的埠號不一樣,那么最終系統底層會把它當作兩條TCP連接來處理,所以這里取巧給服務端開啟了100個埠號,這就是單機百萬連接的準備作業,如下圖所示,

單機1024及以下的埠只能給ROOT保留使用,客戶端埠范圍為1 025~65 535,接下來用代碼實作單機百萬連接的模擬場景,先看服務端類,回圈開啟[8 000~8 100)這100個監聽埠,等待客戶端連接,下面已Netty為例撰寫代碼如下,
package com.tom.netty.connection;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @author Tom
*/
public final class Server {
public static final int BEGIN_PORT = 8000;
public static final int N_PORT = 8100;
public static void main(String[] args) {
new Server().start(Server.BEGIN_PORT, Server.N_PORT);
}
public void start(int beginPort, int nPort) {
System.out.println("服務端啟動中...");
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
bootstrap.childHandler(new ConnectionCountHandler());
for (int i = 0; i <= (nPort - beginPort); i++) {
final int port = beginPort + i;
bootstrap.bind(port).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("成功系結監聽埠: " + port);
}
});
}
System.out.println("服務端已啟動!");
}
}
然后看ConnectionCountHandler類的實作邏輯,主要用來統計單位時間內的請求數,每接入一個連接則自增一個數字,每2s統計一次,代碼如下,
package com.tom.netty.connection;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by Tom.
*/
@ChannelHandler.Sharable
public class ConnectionCountHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger nConnection = new AtomicInteger();
public ConnectionCountHandler() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println("當前客戶端連接數: " + nConnection.get());
}
},0, 2, TimeUnit.SECONDS);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
nConnection.incrementAndGet();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
nConnection.decrementAndGet();
}
}
再看客戶端類代碼,主要功能是回圈依次往服務端開啟的100個埠發起請求,直到服務端無回應、執行緒掛起為止,代碼如下,
package com.tom.netty.connection;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* Created by Tom.
*/
public class Client {
private static final String SERVER_HOST = "127.0.0.1";
public static void main(String[] args) {
new Client().start(Server.BEGIN_PORT, Server.N_PORT);
}
public void start(final int beginPort, int nPort) {
System.out.println("客戶端已啟動...");
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
}
});
int index = 0;
int port;
while (!Thread.interrupted()) {
port = beginPort + index;
try {
ChannelFuture channelFuture = bootstrap.connect(SERVER_HOST, port);
channelFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
System.out.println("連接失敗,程式關閉!");
System.exit(0);
}
}
});
channelFuture.get();
} catch (Exception e) {
}
if (port == nPort) { index = 0; }else { index ++; }
}
}
}
最后,將服務端程式打包發布到Linux服務器上,同樣將客戶端程式打包發布到另一臺Linux服務器上,接下來分別啟動服務端和客戶端程式,運行一段時間之后,會發現服務端監聽的連接數定格在一個值不再變化,如下所示,
當前客戶端連接數: 870
當前客戶端連接數: 870
當前客戶端連接數: 870
當前客戶端連接數: 870
當前客戶端連接數: 870
當前客戶端連接數: 870
當前客戶端連接數: 870
當前客戶端連接數: 870
當前客戶端連接數: 870
...
并且拋出如下例外,
Exception in thread "nioEventLoopGroup-2-1" java.lang.InternalError: java.io.FileNotFoundException: /usr/java/jdk1.8.0_121/jre/lib/ext/cldrdata.jar (Too many open files)
at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1040)
at sun.misc.URLClassPath.getResource(URLClassPath.java:239)
at java.net.URLClassLoader$1.run(URLClassLoader.java:365)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.util.ResourceBundle$RBClassLoader.loadClass(ResourceBundle.java:503)
at java.util.ResourceBundle$Control.newBundle(ResourceBundle.java:2640)
at java.util.ResourceBundle.loadBundle(ResourceBundle.java:1501)
at java.util.ResourceBundle.findBundle(ResourceBundle.java:1465)
at java.util.ResourceBundle.findBundle(ResourceBundle.java:1419)
at java.util.ResourceBundle.getBundleImpl(ResourceBundle.java:1361)
at java.util.ResourceBundle.getBundle(ResourceBundle.java:845)
at java.util.logging.Level.computeLocalizedLevelName(Level.java:265)
at java.util.logging.Level.getLocalizedLevelName(Level.java:324)
at java.util.logging.SimpleFormatter.format(SimpleFormatter.java:165)
at java.util.logging.StreamHandler.publish(StreamHandler.java:211)
at java.util.logging.ConsoleHandler.publish(ConsoleHandler.java:116)
at java.util.logging.Logger.log(Logger.java:738)
at io.netty.util.internal.logging.JdkLogger.log(JdkLogger.java:606)
at io.netty.util.internal.logging.JdkLogger.warn(JdkLogger.java:482)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run (SingleThreadEventExecutor.java:876)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run (DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
這個時候,我們就應該要知道,這已經是服務器所能接受客戶端連接數量的瓶頸值,也就是服務端最大支持870個連接,接下來要做的事情是想辦法突破這個瓶頸,讓單臺服務器也能支持100萬連接,這是一件多么激動人心的事情,
2 單機百萬連接調優解決思路
2.1 突破區域檔案句柄限制
首先在服務端輸入命令,看一下單個行程所能支持的最大句柄數,
ulimit -n
輸入命令后,會出現1 024的數字,表示Linux系統中一個行程能夠打開的最大檔案數,由于開啟一個TCP連接就會在Linux系統中對應創建一個檔案,所以就是受這個檔案的最大檔案數限制,那為什么前面演示的服務端連接數最終定格在870,比1 024小呢?其實是因為除了連接數,還有JVM打開的檔案Class類也算作行程內打開的檔案,所以,1 024減去JVM打開的檔案數剩下的就是TCP所能支持的連接數,
接下來想辦法突破這個限制,首先在服務器命令列輸入以下命令,打開/etc/security/limits.conf檔案,
sudo vi /etc/security/limits.conf
然后在這個檔案末尾加上下面兩行代碼,
* hard nofile 1000000
* soft nofile 1000000
前面的*表示當前用戶,hard和soft分別表示限制和警告限制,nofile表示最大的檔案數標識,后面的數字1 000 000表示任何用戶都能打開100萬個檔案,這也是作業系統所能支持的最大值,如下圖所示,

接下來,輸入以下命令,
ulimit -n
這時候,我們發現還是1 024,沒變,重啟服務器,將服務端程式和客戶端程式分別重新運行,這時候只需靜靜地觀察連接數的變化,最終連接數停留在137 920,同時拋出了例外,如下所示,
當前客戶端連接數: 137920
當前客戶端連接數: 137920
當前客戶端連接數: 137920
當前客戶端連接數: 137920
當前客戶端連接數: 137920
Exception in thread "nioEventLoopGroup-2-1" java.lang.InternalError: java.io.FileNotFoundException: /usr/java/jdk1.8.0_121/jre/lib/ext/cldrdata.jar (Too many open files)
...
這又是為什么呢?肯定還有地方限制了連接數,想要突破這個限制,就需要突破全域檔案句柄數的限制,
2.2 突破全域檔案句柄限制
首先在Linux命令列輸入以下命令,可以查看Linux系統所有用戶行程所能打開的檔案數,
cat /proc/sys/fs/file-max
通過上面這個命令可以看到全域的限制,發現得到的結果是10 000,可想而知,區域檔案句柄數不能大于全域的檔案句柄數,所以,必須將全域的檔案句柄數限制調大,突破這個限制,首先切換為ROOT用戶,不然沒有權限,
sudo -s
echo 2000> /proc/sys/fs/file-max
exit
我們改成20 000來測驗一下,繼續試驗,分別啟動服務端程式和客戶端程式,發現連接數已經超出了20 000的限制,
前面使用echo來配置/proc/sys/fs/file-max的話,重啟服務器就會失效,還會變回原來的10 000,因此,直接用vi命令修改,輸入以下命令列,
sodu vi /etc/sysctl.conf
在/etc/sysctl.conf檔案末尾加上下面的內容,
fs.file-max=1000000
結果如下圖所示,

接下來重啟 Linux服務器,再啟動服務端程式和客戶端程式,
當前客戶端連接數: 9812451
當前客戶端連接數: 9812462
當前客戶端連接數: 9812489
當前客戶端連接數: 9812501
當前客戶端連接數: 9812503
...
最終連接數定格在 98萬左右,我們發現主要受限于本機本身的性能,用htop命令查看一下,發現CPU都接近100%,如下圖所示,

以上是作業系統層面的調優和性能提升,下面主要介紹基于Netty應用層面的調優,
3 Netty應用級別的性能調優
3.1 Netty應用級別的性能瓶頸復現
首先來看一下應用場景,下面是一段標準的服務端應用程式代碼,
package com.tom.netty.thread;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
/**
* Created by Tom.
*/
public class Server {
private static final int port = 8000;
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EventLoopGroup businessGroup = new NioEventLoopGroup(1000);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.SO_REUSEADDR, true);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
//自定義長度的解碼,每次發送一個long型別的長度資料
//每次傳遞一個系統的時間戳
ch.pipeline().addLast(new FixedLengthFrameDecoder(Long.BYTES));
ch.pipeline().addLast(businessGroup, ServerHandler.INSTANCE);
}
});
ChannelFuture channelFuture = bootstrap.bind(port).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("服務端啟動成功,系結埠為: " + port);
}
});
}
}
我們重點關注服務端的邏輯處理ServerHandler類,
package com.tom.netty.thread;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.concurrent.ThreadLocalRandom;
/**
* Created by Tom.
*/
@ChannelHandler.Sharable
public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
public static final ChannelHandler INSTANCE = new ServerHandler();
//channelread0是主執行緒
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
ByteBuf data = https://www.cnblogs.com/gupaoedu-tom/p/Unpooled.directBuffer();
//從客戶端讀一個時間戳
data.writeBytes(msg);
//模擬一次業務處理,有可能是資料庫操作,也有可能是邏輯處理
Object result = getResult(data);
//重新寫回給客戶端
ctx.channel().writeAndFlush(result);
}
//模擬去資料庫獲取一個結果
protected Object getResult(ByteBuf data) {
int level = ThreadLocalRandom.current().nextInt(1, 1000);
//計算出每次回應需要的時間,用來作為QPS的參考資料
//90.0% == 1ms 1000 100 > 1ms
int time;
if (level <= 900) {
time = 1;
//95.0% == 10ms 1000 50 > 10ms
} else if (level <= 950) {
time = 10;
//99.0% == 100ms 1000 10 > 100ms
} else if (level <= 990) {
time = 100;
//99.9% == 1000ms 1000 1 > 1000ms
} else {
time = 1000;
}
try {
Thread.sleep(time);
} catch (InterruptedException e) {
}
return data;
}
}
上面代碼中有一個getResult()方法,可以把getResult()方法看作是在資料庫中查詢資料的一個方法,把每次查詢的結果回傳給客戶端,實際上,為了模擬查詢資料性能,getResult()傳入的引數是由客戶端傳過來的時間戳,最侄訓傳的還是客戶端傳過來的值,只不過回傳之前做了一次隨機的執行緒休眠處理,以模擬真實的業務處理性能,如下表所示是模擬場景的性能引數,
| 資料處理的業務介面占比 | 處理所耗的時間 |
|---|---|
| 90% | 1ms |
| 95% | 10ms |
| 99% | 100ms |
| 99.9% | 1000ms |
下面來看客戶端,也是一段標準的代碼,
package com.tom.netty.thread;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
/**
* Created by Tom.
*/
public class Client {
private static final String SERVER_HOST = "127.0.0.1";
public static void main(String[] args) throws Exception {
new Client().start(8000);
}
public void start(int port) throws Exception {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new FixedLengthFrameDecoder(Long.BYTES));
ch.pipeline().addLast(ClientHandler.INSTANCE);
}
});
//客戶端每秒鐘向服務端發起1 000次請求
for (int i = 0; i < 1000; i++) {
bootstrap.connect(SERVER_HOST, port).get();
}
}
}
從上面代碼中看到,客戶端會向服務端發起1 000次請求,重點來看客戶端邏輯處理ClientHandler類,
package com.tom.netty.thread;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* Created by Tom.
*/
@ChannelHandler.Sharable
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
public static final ChannelHandler INSTANCE = new ClientHandler();
private static AtomicLong beginTime = new AtomicLong(0);
//總回應時間
private static AtomicLong totalResponseTime = new AtomicLong(0);
//總請求數
private static AtomicInteger totalRequest = new AtomicInteger(0);
public static final Thread THREAD = new Thread(){
@Override
public void run() {
try {
while (true) {
long duration = System.currentTimeMillis() - beginTime.get();
if (duration != 0) {
System.out.println("QPS: " + 1000 * totalRequest.get() / duration + ", " + "平均回應時間: " + ((float) totalResponseTime.get()) / totalRequest.get() + "ms.");
Thread.sleep(2000);
}
}
} catch (InterruptedException ignored) {
}
}
};
@Override
public void channelActive(final ChannelHandlerContext ctx) {
ctx.executor().scheduleAtFixedRate(new Runnable() {
public void run() {
ByteBuf byteBuf = ctx.alloc().ioBuffer();
//將當前系統時間發送到服務端
byteBuf.writeLong(System.currentTimeMillis());
ctx.channel().writeAndFlush(byteBuf);
}
}, 0, 1, TimeUnit.SECONDS);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
//獲取一個回應時間差,本次請求的回應時間
totalResponseTime.addAndGet(System.currentTimeMillis() - msg.readLong());
//每次自增
totalRequest.incrementAndGet();
if (beginTime.compareAndSet(0, System.currentTimeMillis())) {
THREAD.start();
}
}
}
上面代碼主要模擬了Netty真實業務環境下的處理耗時情況,QPS大概在1 000次,每2s統計一次,接下來,啟動服務端和客戶端查看控制臺日志,首先運行服務端,看到控制臺日志如下圖所示,

然后運行客戶端,看到控制臺日志如下圖所示,一段時間之后,發現QPS保持在1 000次以內,平均回應時間越來越長,


回到服務端ServerHander的getResul()方法,在getResult()方法中有執行緒休眠導致阻塞,不難發現,它最侄訓阻塞主執行緒,導致所有的請求擠壓在一個執行緒中,如果把下面的代碼放入執行緒池中,效果將完全不同,
Object result =getResult(data);
ctx.channel().wrteAndFlush(result);
把這兩行代碼放到業務執行緒池里,不斷在后臺運行,運行完成后即時回傳結果,
3.2 Netty應用級別的性能調優方案
下面來改造一下代碼,在服務端的代碼中新建一個ServerThreadPoolHander類,
package com.tom.netty.thread;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by Tom.
*/
@ChannelHandler.Sharable
public class ServerThreadPoolHandler extends ServerHandler {
public static final ChannelHandler INSTANCE = new ServerThreadPoolHandler();
private static ExecutorService threadPool = Executors.newFixedThreadPool(1000);
@Override
protected void channelRead0(final ChannelHandlerContext ctx, ByteBuf msg) {
final ByteBuf data = https://www.cnblogs.com/gupaoedu-tom/p/Unpooled.directBuffer();
data.writeBytes(msg);
threadPool.submit(new Runnable() {
public void run() {
Object result = getResult(data);
ctx.channel().writeAndFlush(result);
}
});
}
}
然后在服務端的Handler處理注冊為ServerThreadPoolHander,洗掉原來的ServerHandler,代碼如下,
ch.pipeline().addLast(ServerThreadPoolHandler.INSTANCE);
隨后,啟動服務端和客戶端程式,查看控制臺日志,如下圖所示,

最終耗時穩定在15ms左右,QPS也超過了1 000次,實際上這個結果還不是最優的狀態,繼續調整,將ServerThreadPoolHander的執行緒個數調整到20,代碼如下,
public static final ChannelHandler INSTANCE = new ServerThreadPoolHandler();
private static ExecutorService threadPool = Executors.newFixedThreadPool(20);
然后啟動程式,發現平均回應時間相差也不是太多,如下圖所示,

由此得出的結論是:具體的執行緒數需要在真實的環境下不斷地調整、測驗,才能確定最合適的數值,本章旨在告訴大家優化的方法,而不是結果,
本文為“Tom彈架構”原創,轉載請注明出處,技術在于分享,我分享我快樂!
如果本文對您有幫助,歡迎關注和點贊;如果您有任何建議也可留言評論或私信,您的支持是我堅持創作的動力,關注微信公眾號『 Tom彈架構 』可獲取更多技術干貨!
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/336111.html
標籤:Java
上一篇:檔案隨機或順序讀寫原理深入淺出
下一篇:Java后端學習路線梳理
