NIO目錄
- 一、NIO編程
- 二、NIO類別庫和相關概念
- 緩沖區Buffer
- 通道Channel
- 多路復用器Selector
- 三、NIO服務端流程分析
- 四、NIO客戶端流程分析
- 五、NIO編程的優點
- 原始碼
- TimeServer
- MultiplexerTimeServer
- TimeClient
- TimeClientHandle
一、NIO編程
NIO有兩種叫法:有人稱之為New I/O;更多的人喜歡稱之為Non-block I/O:非阻塞I/O,后者更能體現NIO的特點,
與Socket和ServerSocket類相對應,NIO也提供了SocketChannel和ServerSocketChannel兩種不同的套接字通道實作,這兩種新增的通道都支持阻塞與非阻塞兩種模式,一般來說:低負載、低并發的應用程式可以選擇同步阻塞I/O以降低編程復雜度;對于高負載、高并發的應用程式,需要使用NIO的非阻塞模式進行開發,
二、NIO類別庫和相關概念
在Java1.4之前的早期版本,Java對I/O的支持并不完善,開發人員在開發高性能I/O程式的時候,會面臨一些巨大的挑戰和困難,主要問題如下:
- 沒有資料緩沖區,I/O性能存在問題,
- 沒有Channel通道的概念,只有輸入和輸出流,
- 同步阻塞式I/O通信(BIO),通常會導致通信執行緒被長時間阻塞,
- 支持的字符集有限,硬體可移植性不好,
NIO 主要有三大核心部分:Channel(通道),Buffer(緩沖區), Selector,傳統 IO 基于位元組流和字 符流進行操作,而 NIO 基于 Channel 和 Buffer(緩沖區)進行操作,資料總是從通道讀取到緩沖區中,或者從緩沖區寫入到通道中,
NIO 和傳統 IO 之間第一個最大的區別是,IO 是面向流的,NIO 是面向緩沖區的,
緩沖區Buffer
Buffer是一個物件,它包含一些要寫入或者要讀出的資料,在NIO類別庫中加入Buffer物件,體現了新庫與原來I/O的一個重要區別,在面向流的IO中,可以將資料直接寫入或者將資料直接得到Stream物件中,
在NIO庫中,所有資料都是用緩沖區處理的,在讀取資料時,他是直接讀到緩沖區的;在寫入資料時,寫入帶緩沖區中,任何時候訪問NIO中的資料都是通過緩沖區進行操作,
緩沖區實質上是一個陣列,通常它是一個位元組陣列(ByteBuffer),也可以使用其他種類的陣列,但是一個緩沖區不僅僅是一個陣列,緩沖區提供了對資料的結構化訪問以及維護讀寫位置(limit)等資訊,
最常用的是ByteBuffer,一個ByteBuffer提供了一組功能用于操作byte陣列,每一種Java基本型別都對應有一種緩沖區(除了Boolean),
ByteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer,
通道Channel
Channel是一個通道,它就像一個自來水管一樣,網路資料通過Channel都讀取和寫入,通道和流的不同之處在于通道是雙向的,流只是在一個方向上移動(一個流必須是InputStream或者OutputStream的子類),而通道可以用于讀、寫或者二者同時進行,Channel是雙全工的,同時支持讀寫操作,
多路復用器Selector
多路復用器Selector是JavaNIO編程的基礎,簡單來講:Selector會不斷輪詢注冊在其上的Channel,如果某個Channel上面發生讀或寫事件,這個Channel就處于就緒狀態,會被Selector輪詢出來,然后通過SelectorKey可以獲取就緒Channel的集合,進行后續的I/O操作,
一個多路復用器Selector可以同時輪詢多個Channel,只需要一個執行緒負責Selector的輪詢,就可以接入成百上千的客戶端,
三、NIO服務端流程分析
- 打開
ServerSocketChannel,用于監聽客戶端的連接,它是所有客戶端連接的父管道, - 系結監聽埠,設定連接為非阻塞模式,
- 創建Reactor執行緒創建多路復用器
Selector并啟動執行緒, - 將
ServerSocketChannel注冊到Reactor執行緒的多路復用器Selector上,監聽ACCEPT事件, - 多路復用器Selector在執行緒run方法的無限回圈體內輪詢準備就緒的Key,
- 多路復用器Selector監聽到有新的客戶端接入,處理新的請求,完成TCP三次握手,建立物理鏈路,
- 設定客戶端鏈路為非阻塞模式,
- 將新接入的客戶端連接注冊到Reactor執行緒的多路復用器上,監聽
讀操作, - 異步讀取客戶端請求訊息到緩沖區,
- 對ByteBuffer進行編解碼,如果有半包訊息指標reset,繼續讀取后續的報文,將解碼成功的訊息封裝成Task,投遞到業務執行緒池中,進行業務邏輯編排,
- 將POJO物件encode成ByteBuffer,呼叫SocketChannel的異步write介面,將訊息異步發送給客戶端,
四、NIO客戶端流程分析
- 打開
SocketChannel,系結客戶端本地地址, - 設定SocketChannel為非阻塞模式,同時設定客戶端連接的TCP引數
- 異步連接服務端
- 判斷連接是否成功,如果連接成功,則直接注冊讀狀態位到多路復用器Selector中,如果當前沒有連接成功(異步連接,回傳false,說明客戶端已經發送sync包,服務端沒有回傳ack包,物理鏈路還沒有建立)
- 向Reactor執行緒的多路復用器注冊
OP_CONNECT狀態位,監聽服務端的TCP ACK應答, - 創建Reactor執行緒,創建多路復用器
Selector并啟動執行緒, - 多路復用器在執行緒run方法的無限回圈體內輪詢準備就緒的key,
- 接收
connect事件進行處理, - 判斷連接結果,如果連接成功,注冊讀事件到多路復用器,
- 異步讀客戶端請求訊息到緩沖區,
- 對ByteBuffer進行
編解碼,如果有半包訊息接識訓沖區Reset,繼續讀取后續的提交,將解碼成功的訊息封裝成Task,投遞到業務執行緒池中,進行業務邏輯編排, - 將POJO物件encode成
ByteBuffer,呼叫SocketChannel的異步write介面將訊息異步發送給客戶端,

五、NIO編程的優點
NIO編程的難度確實比同步阻塞的BIO編程大很多,我們的NIO實體沒有考慮“半包讀”和“半包寫”,如果加上這些,代碼將會更加復雜,既然代碼這么復雜,為什么它的應用卻越來越廣泛呢?使用NIO編程的優點總結如下:
客戶端發起的連接時異步的,可以通過在多路復用器注冊OP_CONNECT等待后續結果,不需要像之的客戶端那樣被同步阻塞,SocketChannel的讀寫操作都是異步的,如果沒有可讀寫的資料它不會同步等待,直接回傳,這樣IO通信執行緒就可以處理其他的鏈路,不需要同步等待這個鏈路可用,執行緒模型的優化,由于JDK的Selector在Linux等主流作業系統上通過epoll實作,它沒有連接句柄數的限制(只受限于作業系統的最大句柄數或者對單個行程的句柄限制),這意味著一個Selector執行緒可以同時處理成千上萬個客戶端連接,而且性能不會隨著客戶端的增加而線性下降,因此,它非常適合做高性能、高負載的網路服務器,
原始碼
TimeServer
創建一個多路復用類(MultiplexerTimeServer),是一個獨立負責的執行緒,負責輪詢多路復用器Selector,可以處理多個客戶端的并發接入,
public class TimeServer {
public static void main(String[] args) {
int port = 8080;
MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
}
}
MultiplexerTimeServer
package com.lsh.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
/**
* @author :LiuShihao
* @date :Created in 2021/3/1 12:03 下午
* @desc :NIO時間服務器 MulitiplexerTimeServer 多路復用器
* 它是一個獨立的執行緒,負責輪詢多路復用器Selector,可以處理多個客戶端的并發接入,
*/
public class MultiplexerTimeServer implements Runnable{
private Selector selector;
private ServerSocketChannel serverChannel;
private volatile boolean stop;
/**
* 初始化多路復用器 系結監聽埠
* 創建多路復用器Selector、ServerSocketChannel
* 對Channel和TCP引數進行配置
* @param port
*/
public MultiplexerTimeServer(int port) {
try {
//創建Reactor執行緒,創建多路復用器
selector = Selector.open();
//打開ServerSocketChannel,用于監聽客戶端的連接,它是所有客戶端連接的父管道
serverChannel = ServerSocketChannel.open();
//將Channel設定為異步非阻塞模式,他的backlog設定為1024
serverChannel.configureBlocking(false);
//系結監聽介面
serverChannel.socket().bind(new InetSocketAddress(port),1024);
//將ServerSockrtChannel注冊到Reactor執行緒的多路復用器Selector上,監聽ACCEPT事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is start in port :"+port);
} catch (IOException e) {
e.printStackTrace();
//如果資源初始化失敗,如埠被占用,則退出
System.exit(1);
}
}
public void stop(){
this.stop = true;
}
@Override
public void run() {
/**
* 多路復用器在run方法的無限回圈體中內輪詢準備就緒的key
* 在while回圈體中,回圈遍歷selector,休眠時間為1秒,無論是否有讀寫時間發生,selector每隔1s都被喚醒1次
* 當有處于就緒狀態的Channel時,selector將回傳該Channel的SelectionKey集合,通過對就緒狀態的Channel集合進行迭代,可以進行網路的異步讀寫操作,
*/
while(!stop){
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handlerInput(key);
}catch (Exception e){
if (key != null){
key.cancel();
if (key.channel() != null){
key.channel().close();
}
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
//多路復用器關閉 后,所有注冊在上面的Channel和Pipe等資源都會被自動用去注冊并關閉,所以不需要重復釋放資源
if (selector != null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 根據SelectorKey的操作位進行判斷即可獲知網路時間的型別,
* @param key
* @throws IOException
*/
private void handlerInput(SelectionKey key) throws IOException{
if (key.isValid()){
//處理新接入的訊息請求
if (key.isAcceptable()){
//Accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//通過ServerSocketChannel的accept接收客戶端的鏈接請求并創建SocketChannel實體,相當于完成了TCP的三次握手,TCP物理鏈路正式建立,
SocketChannel sc = ssc.accept();
//需要將新創建的SocketChannel設定為異步非阻塞,同時也可以對其TCP引數進行設定,例如TCP接收和發送緩沖區的大小等,但作為入門的例子,沒有進行額外的引數設定,
sc.configureBlocking(false);
// Add the new connection to the selector
//將新接入的客戶端連接注冊到Reactor執行緒的多路復用器上,監聽讀操作,讀取客戶端發送的網路訊息,
sc.register(selector,SelectionKey.OP_READ);
}
if (key.isReadable()){
//Read the data
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
// 異步讀取客戶端訊息到緩沖區
int readBytes = sc.read(readBuffer);
if (readBytes > 0){
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("The time server receive order :"+body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
doWrite(sc,currentTime);
} else if (readBytes < 0 ){
//對端鏈路關閉
key.cancel();
sc.close();
}else {
;//讀到0位元組,忽略
}
}
}
}
private void doWrite(SocketChannel channel, String response) throws IOException{
if (response != null && response.trim().length() > 0 ){
//將訊息encode成ByteBuffer,呼叫SocketChannel的異步write介面,將訊息異步發送給客戶端,
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
}
TimeClient
public class TimeClient {
public static void main(String[] args) {
int port = 8080;
//通過創建TimeClient執行緒來處理異步連接和讀寫操作
new Thread(new TimeClientHandle("127.0.0.1",port),"TimeClinet-001").start();
}
}
TimeClientHandle
package com.lsh.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* @author :LiuShihao
* @date :Created in 2021/3/1 2:18 下午
* @desc :NIO時間服務器客戶端 Handle
*/
public class TimeClientHandle implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public TimeClientHandle(String host, int port) {
this.host = host == null ? "127.0.0.1" : host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
try{
doConnect();
}catch (Exception e){
e.printStackTrace();
System.exit(1);
}
while(!stop){
try{
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch (Exception e){
if (key != null){
key.cancel();
if (key.channel() != null){
key.channel().close();
}
}
}
}
}catch (Exception e){
e.printStackTrace();
System.exit(1);
}
}
// 多路復用器關閉后,所有注冊在上面的Channel和Pige等資源都會被自動去注冊并關閉,所以不需要重讀釋放資源,
if (selector != null){
try{
selector.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException{
if (key.isValid()){
//判斷是否連接成功
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()){
if (sc.finishConnect()){
sc.register(selector,SelectionKey.OP_READ);
doWrite(sc);
}else {
//連接失敗,行程退出
System.exit(1);
}
}
if (key.isReadable()){
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0 ){
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes,"UTF-8");
System.out.println("Now is :"+body);
this.stop = true;
} else if (readBytes < 0 ){
key.cancel();
sc.close();
}else {
;//讀到0位元組,忽略
}
}
}
}
private void doConnect() throws IOException{
//如果直接連接成功,則注冊到多路復用器上,發送請求訊息,讀應答
if (socketChannel.connect(new InetSocketAddress(host,port))){
socketChannel.register(selector,SelectionKey.OP_READ);
doWrite(socketChannel);
}else {
socketChannel.register(selector,SelectionKey.OP_CONNECT);
}
}
private void doWrite(SocketChannel sc) throws IOException{
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if (!writeBuffer.hasRemaining()){
System.out.println("Send order 2 server succeed.");
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/265444.html
標籤:其他
