1.基本概念
RPC是Remote Procedure Call的縮寫,即遠程程序呼叫,最基本的RPC模型如下圖所示,
在下圖中,服務提供者A Server、服務消費者B Server,服務消費者只需要通過介面,就可以遠程呼叫服務提供者提供的對應的介面的實作,從而獲取回傳值,完成對應的呼叫程序,

2.具體實作
以下上代碼實體,通過Socket的方式,來實作自己的RPC框架,專案結構如下圖:

專案分為四個模塊:rpc模塊、公共介面模塊、服務提供者模塊、服務消費者模塊,
2.1 rpc模塊
rpc模塊主要實作了RPC遠程程序呼叫的功能,該模塊中,分為三個部分,client端、server端、通信協議,如下圖所示,

2.1.1通信協議
在RPC框架中,客戶端的方法要通過介面呼叫服務端對應介面的實作類的方法,需要提供必要的資訊,具體包括:介面名稱、方法名稱、引數型別串列、引數值串列,代碼如下所示:
package com.supger.rpc.core.protocol;
import java.io.Serializable;
/**
* RPC框架請求協議類,由于這個類需要在網路中傳輸,需要實作序列化介面,
*/
public class RequestProtocol implements Serializable {
/**
* 介面全名稱
*/
private String interfaceClassName;
/**
* 方法名稱
*/
private String methodName;
/**
* 引數型別串列
*/
private Class<?>[] parameterTypes;
/**
* 引數值串列
*/
private Object[] parameterValues;
public String getInterfaceClassName() {
return interfaceClassName;
}
public void setInterfaceClassName(String interfaceClassName) {
this.interfaceClassName = interfaceClassName;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
public Object[] getParameterValues() {
return parameterValues;
}
public void setParameterValues(Object[] parameterValues) {
this.parameterValues = parameterValues;
}
}
2.1.2客戶端
在RPC框架中,客戶端通過動態代理,在動態代理中,把需要呼叫的服務端的介面的相關資訊序列化,最終通過Socket的方式,發送給服務端,等待服務端完成呼叫,最終把結果寫入Socket,動態代理內部,獲取回傳結果,回傳給呼叫方,代碼如下所示:
package com.supger.rpc.core.client;
import com.supger.rpc.core.protocol.RequestProtocol;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
/**
* RPC框架客戶端核心實作類
*/
public class RpcClient {
/**
* 通過動態代理獲取呼叫介面對應實體
* @param interfaceClass
* @param address
* @param <T>
* @return
*/
public static <T> T getRemoteProxy(Class<T> interfaceClass, InetSocketAddress address){
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
try(Socket socket = new Socket()){
// 通過網路連接服務端
socket.connect(address);
try(
// 獲取輸出流
ObjectOutputStream serializer = new ObjectOutputStream(socket.getOutputStream());
// 獲取輸入流
ObjectInputStream deSerializer = new ObjectInputStream(socket.getInputStream())
){
// 創建一個RPC框架中請求協議物件
RequestProtocol requestProtocol = new RequestProtocol();
// 填充屬性
requestProtocol.setInterfaceClassName(interfaceClass.getName());
requestProtocol.setMethodName(method.getName());
requestProtocol.setParameterTypes(method.getParameterTypes());
requestProtocol.setParameterValues(args);
// 序列化協議物件(把資料放入到網路中)
serializer.writeObject(requestProtocol);
/**
* -----------------------
* 同步程序
* -----------------------
*/
// 反序列化(從網路中獲取服務端放入的資料)
Object result = deSerializer.readObject();
return result;
}
}catch (Exception e){
e.printStackTrace();
}
return null;
}
});
}
}
2.1.3服務端
在RPC框架中,服務端需要啟動,監聽對應的埠,客戶端來連接對應的埠,啟動監聽后,當有多個客戶端請求來的時候,為了不因為前一個請求未處理完成而發生阻塞,需要引入多執行緒機制,當服務端監聽到客戶端連接的時候,用執行緒池來處理對應的任務,任務中保存著對應獲取的Socket連接資訊,在任務的內部,run方法中,實作對應的獲取客戶端的呼叫資訊,通過反射的方式,呼叫對應的實作類,而在此之前,服務端需要暴露對應的服務串列,代碼如下所示:
package com.supger.rpc.core.server;
import com.supger.rpc.core.protocol.RequestProtocol;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* RPC框架服務端的核心實作類
* 核心實作步驟:
* 1、暴露需要呼叫的服務介面
* 2、啟動服務端
*/
public class RpcServer {
/**
* 定義存盤暴露的服務串列
*/
Map<String,Object> serverMap = new ConcurrentHashMap<>(32);
/**
* 定義一個執行緒池
*/
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(8,20,200, TimeUnit.MICROSECONDS, new ArrayBlockingQueue<>(10));
/**
* 暴露服務的方法
* @param interfaceClass
* @param instance
*/
public void publishServiceAPI(Class<?> interfaceClass,Object instance){
this.serverMap.put(interfaceClass.getName(),instance);
}
/**
* 定義發布服務的方法
* @param port
*/
public void start(int port){
// 創建網路服務端
try {
ServerSocket serverSocket = new ServerSocket();
// 系結指定的埠
serverSocket.bind(new InetSocketAddress(port));
System.out.println("=============Supger RPC Server Starting ...... ================");
while (true){
poolExecutor.execute(new ServerTask(serverSocket.accept()));
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 創建客戶端請求處理的執行緒類
*/
private class ServerTask implements Runnable{
private final Socket socket;
private ServerTask(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try(
ObjectInputStream deSerializer = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream serializer = new ObjectOutputStream(socket.getOutputStream())
){
// 反序列化獲取客戶端傳入的資料
RequestProtocol requestProtocol = (RequestProtocol) deSerializer.readObject();
// 獲取介面全名稱
String interfaceClassName = requestProtocol.getInterfaceClassName();
Object instance = serverMap.get(interfaceClassName);
if (null == instance){
return;
}
// 創建一個方法物件(反射方式)
Method method = instance.getClass()
.getDeclaredMethod(requestProtocol.getMethodName(), requestProtocol.getParameterTypes());
// 呼叫方法
Object result = method.invoke(instance, requestProtocol.getParameterValues());
// 序列化呼叫結果
serializer.writeObject(result);
}catch (Exception e){
e.printStackTrace();
}
}
}
}
2.2 user-api
在RPC系統中,遠程程序呼叫需要遵循一套統一的API模塊和統一的java-bean,于是,把對應的api和javabean抽取出來,形成一個單獨的模塊(專案),遠程程序呼叫,就基于這樣的一套標準介面,這里僅舉一個簡單的例子,目的在于幫助理解RPC的原理及實作,代碼如下所示:
package service;
/**
* 定義暴露給客戶端的服務介面
*/
public interface UserService {
String addUserName(String name);
}
2.3 user-provider
在RPC系統中,服務提供者,即服務的提供方,需要實作提供服務的對應的介面,即實作2.2 user-api中UserService介面,實作類代碼如下所示,
package com.supger.service;
import service.UserService;
public class UserServiceImpl implements UserService {
@Override
public String addUserName(String name) {
return "添加的姓名為:"+name;
}
}
需要強調的是,該處實作的介面,需要引入公共的介面,
在實作介面之后,需要啟動對應的服務,對應的代碼如下所示,
package com.supger;
import com.supger.rpc.core.server.RpcServer;
import com.supger.service.UserServiceImpl;
import service.UserService;
public class App {
public static void main(String[] args) {
// 創建一個RPC的服務端
RpcServer rpcServer = new RpcServer();
// 發布暴露服務
rpcServer.publishServiceAPI(UserService.class,new UserServiceImpl());
// 啟動服務
rpcServer.start(12345);
}
}
在上述代碼中,先創建一個RPC的服務端,然后發布暴露對應的服務,最后啟動服務,
2.4 user-consumer
在完成上述步驟之后,最后創建一個服務的消費者,在消費者中,通過呼叫RPC框架,完成對應的遠程程序呼叫,就像呼叫本地方法一樣簡單,對應的代碼如下所示,
package com.supger;
import com.supger.rpc.core.client.RpcClient;
import service.UserService;
import java.net.InetSocketAddress;
public class App {
public static void main(String[] args) {
// 在客戶端呼叫RPC框架
UserService userService = RpcClient.getRemoteProxy(UserService.class, new InetSocketAddress("127.0.0.1", 12345));
String result = userService.addUserName("supger");
System.out.println(result);
}
}
至此,完成了實作自己的RPC框架的的流程,
3.運行測驗
啟動服務提供模塊,如下圖所示,表明服務已成功啟動,

啟動服務消費模塊,如下圖所示,

從運行結果可知,通過以上程序,實作了自己的RPC框架,
4. 結語
該程序涉及到Java中的一些基本的知識,具體包括:反射、序列化流、動態代理、多執行緒、執行緒池,后續會對相關概念做一個詳盡的總結,目的一方面在于打好基本功,同時方便各位學習交流之用,
上述的實作程序,僅作為對RPC原理認識的一個初步方案,后續會學習RPC實作的相關框架Dubbo,等,屆時,會進一步加深對RPC系統需要考慮的多方面的因素進行探討,
關于RPC探討的文章有很多,以下列出了一些參考文章,從不同的文章出發,會有不同的識訓,從而加深對RPC進一步的理解,(本文為基于參考資源1的整理,特此注明參考來源)
5.參考資源
1.原來RPC框架原理這么學習,完全是一種享受!
2.初識RPC
3.RPC基本原理
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/245661.html
標籤:java
