基于nio單reactor 多作業執行緒模型
客戶端
public class TcpClient {
public static void main(String[] args) throws IOException {
//開啟網路channel
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",88));
socketChannel.write(ByteBuffer.wrap("hello world".getBytes()));
while (true);
}
}
服務器
public class ReactorServer {
//開始監聽服務
public static void start(Integer port){
try {
//1.服務端開啟一個監聽通道
ServerSocketChannel serverSocketChannel= ServerSocketChannel.open();
//2.系結埠
serverSocketChannel.bind(new InetSocketAddress(port));
//3.設定為非阻塞
serverSocketChannel.configureBlocking(false);
//開啟一個selector
Selector selector =Selector.open();
//注冊channel 和要監聽的時間
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT,new Acceptor(selector,serverSocketChannel));
while (selector.select()>0){
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
//處理事件
while (iterator.hasNext()){
SelectionKey key = iterator.next();
Runnable handler =(Runnable) key.attachment();
handler.run();
//將事件移除
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
start(88);
}
}
Accepor
public class Acceptor implements Runnable {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public Acceptor(Selector selector,ServerSocketChannel serverSocketChannel){
this.selector=selector;
this.serverSocketChannel=serverSocketChannel;
}
@Override
public void run() {
try{
SocketChannel socketChannel = serverSocketChannel.accept();
//設定非阻塞
socketChannel.configureBlocking(false);
//注冊selector,并交個dispatchhandler處理
System.out.println("有客戶端連接進來");
socketChannel.register(selector,SelectionKey.OP_READ,new DispatchHandler(socketChannel));
}catch (IOException e){
e.printStackTrace();
}
}
}
DispatchHandler
public class DispatchHandler implements Runnable {
private static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()<<1);
private SocketChannel socketChannel;
public DispatchHandler(SocketChannel socketChannel){
this.socketChannel= socketChannel;
}
@Override
public void run() {
System.out.println("開啟執行緒處理讀寫事件");
//通過執行緒池
executor.execute(new ReadHandler(socketChannel));
}
}
ReadHandler
public class ReadHandler implements Runnable {
private SocketChannel socketChannel;
public ReadHandler(SocketChannel socketChannel){
this.socketChannel=socketChannel;
}
@Override
public void run() {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
Charset charset = Charset.forName("utf-8");
try {
while (socketChannel.read(byteBuffer)>0){
byteBuffer.flip();
System.out.println(charset.decode(byteBuffer).toString());
}
byteBuffer.clear();
//資料回寫
socketChannel.write(ByteBuffer.wrap("收到".getBytes()));
}catch (IOException e){
e.printStackTrace();
}
}
}
下面是服務器運行結果
有客戶端連接進來
開啟執行緒處理讀寫事件
開啟執行緒處理讀寫事件
開啟執行緒處理讀寫事件
.....
hello world
為什么讀事件會被重復執行 ,我上面都洗掉了嘛
uj5u.com熱心網友回復:
DispatchHandler 初始化 先在物件池中建立最小數量的物件uj5u.com熱心網友回復:
哪個物件池 是執行緒池嗎
uj5u.com熱心網友回復:
就是這個,執行緒池
private static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()<<1);
uj5u.com熱心網友回復:
DispatchHandler 初始化 先在物件池中建立最小數量的物件
哪個物件池 是執行緒池嗎
就是這個,執行緒池
private static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()<<1);
我把 這個設定成1 還是一樣,我想了下 這可能是因為java nio 是基于epoll的 LT 水平觸發造成的
雖然我開啟了子執行緒去讀 但執行緒并不是馬上執行,selector 后面檢測的時候 資料還沒被讀走 所有 又生成了READ 事件
不知道 有什么方法可以解決
uj5u.com熱心網友回復:
你用的NIO具體是什么?是netty嗎?在什么作業系統測驗的?
uj5u.com熱心網友回復:
你用的NIO具體是什么?是netty嗎?
在什么作業系統測驗的?
沒用netty 就是java nio包 ,windows
uj5u.com熱心網友回復:
上netty吧,自己寫后面還不知道多少坑。uj5u.com熱心網友回復:
樓主說的讀事件,其實是 可讀事件。也就是說,channel 可讀的時候,就會生成這個事件,而不是說,收到資料之后會生成這個事件。
可讀事件,和,可寫事件,其實是一個意思,就是channel是否可以進行讀取或者寫入操作。
至于,是否能讀到資料,或者,是否能寫入資料,還要具體操作之后,才能確定。
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/58317.html
標籤:Java SE
上一篇:各位懂jsp的大佬看看我!!
