您好,我是湘王,這是我的博客園,歡迎您來,歡迎您再來~
在Java NIO的三大核心中,除了Channel和Buffer,剩下的就是Selector了,有的地方叫它選擇器,也有叫多路復用器的(比如Netty),
之前提過,資料總是從Channel讀取到Buffer,或者從Buffer寫入到Channel,單個執行緒可以監聽多個Channel——Selector就是這個執行緒背后的實作機制(所以得名Selector),

Selector通過控制單個執行緒處理多個Channel,如果應用打開了多個Channel,但每次傳輸的流量都很低,使用Selector就會很方便(至于為什么,具體到Netty中再分析),所以使用Selector的好處就顯而易見:用最少的資源實作最多的操作,避免了執行緒切換帶來的開銷,
還是以代碼為例來演示Selector的作用,新建一個類,在main()方法中輸入下面的代碼:
/** * NIO中的Selector * * @author xiangwang */ public class TestSelector { public static void main(String args[]) throws IOException { // 創建ServerSocketChannel ServerSocketChannel channel1 = ServerSocketChannel.open(); channel1.socket().bind(new InetSocketAddress("127.0.0.1", 8080)); channel1.configureBlocking(false); ServerSocketChannel channel2 = ServerSocketChannel.open(); channel2.socket().bind(new InetSocketAddress("127.0.0.1", 9090)); channel2.configureBlocking(false); // 創建一個Selector物件 Selector selector = Selector.open(); // 按照字面意思理解,應該是這樣的:selector.register(channel, event); // 但其實是這樣的:channel.register(selector, SelectionKey.OP_READ); // 四種監聽事件: // OP_CONNECT(連接就緒) // OP_ACCEPT(接收就緒) // OP_READ(讀就緒) // OP_WRITE(寫就緒) // 注冊Channel到Selector,事件一旦被觸發,監聽隨之結束 SelectionKey key1 = channel1.register(selector, SelectionKey.OP_ACCEPT); SelectionKey key2 = channel2.register(selector, SelectionKey.OP_ACCEPT); // 模板代碼:在撰寫程式時,大多數時間都是在模板代碼中添加相應的業務代碼 while(true) { int readyNum = selector.select(); if (readyNum == 0) { continue; } Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 輪詢 for (SelectionKey key : selectedKeys) { Channel channel = key.channel(); if (key.isConnectable()) { if (channel == channel1) { System.out.println("channel1連接就緒"); } else { System.out.println("channel2連接就緒"); } } else if (key.isAcceptable()) { if (channel == channel1) { System.out.println("channel1接收就緒"); } else { System.out.println("channel2接收就緒"); } } // 觸發后洗掉,這里不刪 // it.remove(); } } } }
代碼寫好后啟動ServerSocketChannel服務,可以看到我這里已經啟動成功:

然后在網上下載一個叫做SocketTest.jar的工具(在一些工具網站下載的時候當心中毒,如果不放心,可以私信我,給你地址),雙擊打開,并按下圖方式執行:

點擊「Connect」可以看到變化:

然后點擊「Disconnect」,再輸入「9090」后,再點擊「Connect」試試:

可以看到結果顯示結果變了:

兩次連接,列印了三條資訊:說明selector的輪詢在起作用(因為Set<SelectionKey>中包含了所有處于監聽的SelectionKey),但是「接收就緒」監聽事件僅執行了一次就再不回應,如果感興趣的話你可以把OP_READ、OP_WRITE這些事件也執行一下試試看,
因為Selector是單執行緒輪詢監聽多個Channel,那么如果Selector(執行緒)之間需要傳遞資料,怎么辦呢?——Pipe登場了,Pipe就是一種用于Selector之間資料傳遞的「管道」,
先來看個圖:

可以清楚地看到它的作業方式,
還是用代碼來解釋,
/** * NIO中的Pipe * * @author xiangwang */ public class TestPipe { public static void main(String args[]) throws IOException { // 打開管道 Pipe pipe = Pipe.open(); // 將Buffer資料寫入到管道 Pipe.SinkChannel sinkChannel = pipe.sink(); ByteBuffer buffer = ByteBuffer.allocate(32); buffer.put("ByteBuffer".getBytes()); // 切換到寫模式 buffer.flip(); sinkChannel.write(buffer); // 從管道讀取資料 Pipe.SourceChannel sourceChannel = pipe.source(); buffer = ByteBuffer.allocate(32); sourceChannel.read(buffer); System.out.println(new String(buffer.array())); // 關閉管道 sinkChannel.close(); sourceChannel.close(); } }
之前說過,同步指的按順序一次完成一個任務,直到前一個任務完成并有了結果以后,才能再執行后面的任務,而異步指的是前一個任務結束后,并不等待任務結果,而是繼續執行后一個任務,在所有任務都「執行」完后,通過任務的回呼函式去獲得結果,所以異步使得應用性能有了極大的提高,為了更加生動地說明什么是異步,可以來做個實驗:

通過呼叫CompletableFuture.supplyAsync()方法可以很明顯地觀察到,處于位置2的「這一步先執行」會最先顯示,然后才執行位置1的代碼,而這就是異步的具體實作,
NIO為了支持異步,升級到了NIO2,也就是AIO,而AIO引入了新的異步Channel的概念,并提供了異步FileChannel和異步SocketChannel的實作,AIO的異步SocketChannel是真正的異步非阻塞I/O,通過代碼可以更好地說明:
/** * AIO客戶端 * * @author xiangwang */ public class AioClient { public void start() throws IOException, InterruptedException { AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(); if (channel.isOpen()) { // socket接識訓沖區recbuf大小 channel.setOption(StandardSocketOptions.SO_RCVBUF, 128 * 1024); // socket發送緩沖區recbuf大小 channel.setOption(StandardSocketOptions.SO_SNDBUF, 128 * 1024); // 保持長連接狀態 channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); // 連接到服務端 channel.connect(new InetSocketAddress(8080), null, new AioClientHandler(channel)); // 阻塞主行程 for(;;) { TimeUnit.SECONDS.sleep(1); } } else { throw new RuntimeException("Channel not opened!"); } } public static void main(String[] args) throws IOException, InterruptedException { new AioClient().start(); } }
/** * AIO客戶端CompletionHandler * * @author xiangwang */ public class AioClientHandler implements CompletionHandler<Void, AioClient> { private final AsynchronousSocketChannel channel; private final CharsetDecoder decoder = Charset.defaultCharset().newDecoder(); private final BufferedReader input = new BufferedReader(new InputStreamReader(System.in)); public AioClientHandler(AsynchronousSocketChannel channel) { this.channel = channel; } @Override public void failed(Throwable exc, AioClient attachment) { throw new RuntimeException("channel not opened!"); } @Override public void completed(Void result, AioClient attachment) { System.out.println("send message to server: "); try { // 將輸入內容寫到buffer String line = input.readLine(); channel.write(ByteBuffer.wrap(line.getBytes())); // 在作業系統中的Java本地方法native已經把資料寫到了buffer中 // 這里只需要一個緩沖區能接收就行了 ByteBuffer buffer = ByteBuffer.allocate(1024); while (channel.read(buffer).get() != -1) { buffer.flip(); System.out.println("from server: " + decoder.decode(buffer).toString()); if (buffer.hasRemaining()) { buffer.compact(); } else { buffer.clear(); } // 將輸入內容寫到buffer line = input.readLine(); channel.write(ByteBuffer.wrap(line.getBytes())); } } catch (IOException | InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
/** * AIO服務端 * * @author xiangwang */ public class AioServer { public void start() throws InterruptedException, IOException { AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open(); if (channel.isOpen()) { // socket接受緩沖區recbuf大小 channel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024); // 埠重用,防止行程意外終止,未釋放埠,重啟時失敗 // 因為直接殺行程,沒有顯式關閉套接字來釋放埠,會等待一段時間后才可以重新use這個關口 // 解決辦法就是用SO_REUSEADDR channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); channel.bind(new InetSocketAddress(8080)); } else { throw new RuntimeException("channel not opened!"); } // 處理client連接 channel.accept(null, new AioServerHandler(channel)); System.out.println("server started"); // 阻塞主行程 for(;;) { TimeUnit.SECONDS.sleep(1); } } public static void main(String[] args) throws IOException, InterruptedException { AioServer server = new AioServer(); server.start(); } }
/** * AIO服務端CompletionHandler * * @author xiangwang */ public class AioServerHandler implements CompletionHandler<AsynchronousSocketChannel, Void> { private final AsynchronousServerSocketChannel serverChannel; private final CharsetDecoder decoder = Charset.defaultCharset().newDecoder(); private final BufferedReader input = new BufferedReader(new InputStreamReader(System.in)); public AioServerHandler(AsynchronousServerSocketChannel serverChannel) { this.serverChannel = serverChannel; } @Override public void failed(Throwable exc, Void attachment) { // 處理下一次的client連接 serverChannel.accept(null, this); } @Override public void completed(AsynchronousSocketChannel result, Void attachment) { // 處理下一次的client連接,類似鏈式呼叫 serverChannel.accept(null, this); try { // 將輸入內容寫到buffer String line = input.readLine(); result.write(ByteBuffer.wrap(line.getBytes())); // 在作業系統中的Java本地方法native已經把資料寫到了buffer中 // 這里只需要一個緩沖區能接收就行了 ByteBuffer buffer = ByteBuffer.allocate(1024); while (result.read(buffer).get() != -1) { buffer.flip(); System.out.println("from client: " + decoder.decode(buffer).toString()); if (buffer.hasRemaining()) { buffer.compact(); } else { buffer.clear(); } // 將輸入內容寫到buffer line = input.readLine(); result.write(ByteBuffer.wrap(line.getBytes())); } } catch (InterruptedException | ExecutionException | IOException e) { e.printStackTrace(); } } }
執行測驗后顯示,不管是在客戶端還是在服務端,讀寫完全是異步的,
感謝您的大駕光臨!咨詢技術、產品、運營和管理相關問題,請關注后留言,歡迎騷擾,不勝榮幸~
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/518799.html
標籤:Java
