作者:莫那魯道
https://www.cnblogs.com/stateis0/p/8960791.html
眾所周知,dubbo 底層使用了 Netty 作為網路通訊框架,而 Netty 的高性能我們之前也分析過原始碼,對他也算還是比較了解了,
今天我們就自己用 Netty 實作一個簡單的 RPC 框架,
1、需求
模仿 dubbo,消費者和提供者約定介面和協議,消費者遠程呼叫提供者,提供者回傳一個字串,消費者列印提供者回傳的資料,底層網路通信使用 Netty 4.1.16,
2、設計
-
創建一個介面,定義抽象方法,用于消費者和提供者之間的約定,
-
創建一個提供者,該類需要監聽消費者的請求,并按照約定回傳資料,
-
創建一個消費者,該類需要透明的呼叫自己不存在的方法,內部需要使用 Netty 請求提供者回傳資料,
3、 實作
1. 創建 maven 專案,匯入 Netty 4.1.16,
<groupId>cn.thinkinjava</groupId>
<artifactId>rpc-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.16.Final</version>
</dependency>
</dependencies>
2. 專案目錄結構如下:

3. 設計介面
===============
一個簡單的 hello world:
public interface HelloService {
String hello(String msg);
}
4. 提供者相關實作
==================
4.1. 首先實作約定介面,用于回傳客戶端資料:
/**
* 實作類
*/
public class HelloServiceImpl implements HelloService {
public String hello(String msg) {
return msg != null ? msg + " -----> I am fine." : "I am fine.";
}
}
4.2. 實作 Netty 服務端和自定義 handler
啟動 Netty Server 代碼:
private static void startServer0(String hostName, int port) { try {
ServerBootstrap bootstrap = new ServerBootstrap();
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
bootstrap.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() { @Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
p.addLast(new HelloServerHandler());
}
});
bootstrap.bind(hostName, port).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
上面的代碼中添加了 String型別的編解碼 handler,添加了一個自定義 handler,
自定義 handler 邏輯如下:
/**
* 用于處理請求資料
*/public class HelloServerHandler extends ChannelInboundHandlerAdapter { @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // 如何符合約定,則呼叫本地方法,回傳資料
if (msg.toString().startsWith(ClientBootstrap.providerName)) {
String result = new HelloServiceImpl()
.hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
ctx.writeAndFlush(result);
}
}
}
這里顯示判斷了是否符合約定(并沒有使用復雜的協議,只是一個字串判斷),然后創建一個具體實作類,并呼叫方法寫回客戶端,為什么Netty這么火?為什么?
還需要一個啟動類:
public class ServerBootstrap { public static void main(String[] args) {
NettyServer.startServer("localhost", 8088);
}
}
好,關于提供者的代碼就寫完了,主要就是創建一個 netty 服務端,實作一個自定義的 handler,自定義 handler 判斷是否符合之間的約定(算是協議吧),如果符合,就創建一個介面的實作類,并呼叫他的方法回傳字串,
5. 消費者相關實作
消費者有一個需要注意的地方,就是呼叫需要透明,也就是說,框架使用者不用關心底層的網路實作,這里我們可以使用 JDK 的動態代理來實作這個目的,
思路:客戶端呼叫代理方法,回傳一個實作了 HelloService 介面的代理物件,呼叫代理物件的方法,回傳結果,
我們需要在代理中做手腳,當呼叫代理方法的時候,我們需要初始化 Netty 客戶端,還需要向服務端請求資料,并回傳資料,
5.1. 首先創建代理相關的類
public class RpcConsumer { private static ExecutorService executor = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private static HelloClientHandler client; /**
* 創建一個代理物件
*/
public Object createProxy(final Class<?> serviceClass, final String providerName) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serviceClass}, (proxy, method, args) -> { if (client == null) {
initClient();
} // 設定引數
client.setPara(providerName + args[0]); return executor.submit(client).get();
});
} /**
* 初始化客戶端
*/
private static void initClient() {
client = new HelloClientHandler();
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() { @Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
p.addLast(client);
}
}); try {
b.connect("localhost", 8088).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
該類有 2 個方法,創建代理和初始化客戶端,
初始化客戶端邏輯:創建一個 Netty 的客戶端,并連接提供者,并設定一個自定義 handler,和一些 String 型別的編解碼器,
創建代理邏輯:使用 JDK 的動態代理技術,代理物件中的 invoke 方法實作如下:如果 client 沒有初始化,則初始化 client,這個 client 既是 handler ,也是一個 Callback,將引數設定進 client ,使用執行緒池呼叫 client 的 call 方法并阻塞等待資料回傳,
看看 HelloClientHandler 的實作:
public class HelloClientHandler extends ChannelInboundHandlerAdapter implements Callable { private ChannelHandlerContext context; private String result; private String para; @Override
public void channelActive(ChannelHandlerContext ctx) {
context = ctx;
} /**
* 收到服務端資料,喚醒等待執行緒
*/
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) {
result = msg.toString();
notify();
} /**
* 寫出資料,開始等待喚醒
*/
@Override
public synchronized Object call() throws InterruptedException {
context.writeAndFlush(para);
wait(); return result;
} void setPara(String para) { this.para = para;
}
}
該類快取了 ChannelHandlerContext,用于下次使用,有兩個屬性:回傳結果和請求引數,
當成功連接后,快取 ChannelHandlerContext,當呼叫 call 方法的時候,將請求引數發送到服務端,等待,當服務端收到并回傳資料后,呼叫 channelRead 方法,將回傳值賦值個 result,并喚醒等待在 call 方法上的執行緒,此時,代理物件回傳資料,
再看看設計的測驗類:
public class ClientBootstrap { public static final String providerName = "HelloService#hello#"; public static void main(String[] args) throws InterruptedException {
RpcConsumer consumer = new RpcConsumer(); // 創建一個代理物件
HelloService service = (HelloService) consumer
.createProxy(HelloService.class, providerName); for (; ; ) {
Thread.sleep(1000);
System.out.println(service.hello("are you ok ?"));
}
}
}
測驗類首先創建了一個代理物件,然后每隔一秒鐘呼叫代理的 hello 方法,并列印服務端回傳的結果,
測驗結果

成功列印,
4、總結
看了這么久的 Netty 原始碼,我們終于實作了一個自己的 Netty 應用,雖然這個應用很簡單,甚至代碼寫的有些粗糙,但功能還是實作了,RPC 的目的就是允許像呼叫本地服務一樣呼叫遠程服務,需要對使用者透明,于是我們使用了動態代理,并使用 Netty 的 handler 發送資料和回應資料,完成了一次簡單的 RPC 呼叫,
當然,還是那句話,代碼比較簡單,主要是思路,以及了解 RPC 底層的實作,
近期熱文推薦:
1.Java 15 正式發布, 14 個新特性,重繪你的認知!!
2.終于靠開源專案弄到 IntelliJ IDEA 激活碼了,真香!
3.我用 Java 8 寫了一段邏輯,同事直呼看不懂,你試試看,,
4.吊打 Tomcat ,Undertow 性能很炸!!
5.《Java開發手冊(嵩山版)》最新發布,速速下載!
覺得不錯,別忘了隨手點贊+轉發哦!
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/239871.html
標籤:Java
