文章目錄
- 前言
- 需要解決的問題
- 手寫RPC實戰
- 1、定義通信協議
- 2、自定義注解
- 3、定義介面
- 4、實作介面
- 5、暴露服務并監聽處理請求
- 6、生成RPC動態代理物件
- 7、消費者注入RPC動態代理物件
- 功能測驗
- 尾巴
前言
RPC是遠程程序呼叫(Remote Procedure Call)的縮寫形式,SAP系統RPC呼叫的原理其實很簡單,有一些類似于三層構架的C/S系統,第三方的客戶程式通過介面呼叫SAP內部的標準或自定義函式,獲得函式回傳的資料進行處理后顯示或列印,
隨著微服務、分布式的大熱,開發者慢慢趨向于將一個大的服務拆分成多個獨立的小的服務,
服務經過拆分后,服務與服務之間的通信就變得至關重要,
RPC說白了就是節點A去呼叫節點B的服務,站在Java的角度看,就是像呼叫本地函式一樣呼叫遠程函式,
需要解決的問題
要想實作RPC,首先需要解決以下幾個問題:
- 服務之間如何通信?
Socket 網路IO, - 請求引數、回傳結果如何傳輸?
Java將物件序列化為位元組陣列通過網路IO傳輸, - 介面沒有實作類,該如何呼叫?
JDK動態代理生成代理物件, - 如何發起遠程呼叫?
在代理物件中發起Socket請求遠程服務器,
手寫RPC實戰
首先看下目錄結構:

1、定義通信協議
消費者發起一個呼叫請求,服務者必須知道你要調哪個服務,引數是什么,這些需要封裝好,
@Data
public class RpcMessage implements Serializable {
private static final long serialVersionUID = 1L;
private String interfaceName;//呼叫的Service介面名
private String methodName;//呼叫的方法名
private Class<?>[] argsType;//引數型別串列
private Object[] args;//引數
}
2、自定義注解
分別是服務的提供者和消費者,
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Service//引入Spring Service,自動注入IOC容器
// 服務提供者
public @interface MyRpcService {
}
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
// 服務消費者
public @interface MyRpcReference {
}
3、定義介面
public interface UserService {
// 根據UserId查找用戶
R<UserResp> findById(Long userId);
}
4、實作介面
加上自定義注解@MyRpcService,后續需要掃描這些實作類,并暴露服務,
@MyRpcService
public class UserServiceImpl implements UserService{
@Override
public R<UserResp> findById(Long userId) {
UserResp userResp = new UserResp();
userResp.setId(userId);
userResp.setName("張三");
userResp.setPwd("root@abc");
return R.ok(userResp);
}
}
5、暴露服務并監聽處理請求
應用程式啟動后,從Spring的IOC容器中,找到加了@MyRpcService注解的服務,并暴露出去,
/**
* @author: pch
* @description: 程式啟動,暴露Service服務
* @date: 2020/10/13
**/
@Component
public class ProviderListener implements ApplicationListener<ApplicationStartedEvent> {
@Override
public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) {
ConfigurableApplicationContext context = applicationStartedEvent.getApplicationContext();
for (Object bean : context.getBeansWithAnnotation(MyRpcService.class).values()) {
ProviderHolder.addService(bean);
}
try {
ProviderHolder.start();
} catch (Exception e) {
e.printStackTrace();
}
System.err.println("provider...啟動");
}
}
暴露服務,處理消費者請求的核心代碼
/**
* @author: pch
* @description: 服務持有者
* @date: 2020/10/13
**/
public class ProviderHolder {
// 快取所有的服務提供者
private static final Map<String, Provider> SERVICES = new ConcurrentHashMap<>();
// 起一個執行緒池,處理消費者的請求
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
// 添加服務
public static void addService(Object bean) {
Class<?> beanClass = bean.getClass();
String interfaceName = beanClass.getInterfaces()[0].getName();
SERVICES.put(interfaceName, new Provider(bean));
}
/**
* 啟動服務
* @throws Exception
*/
public static void start() throws Exception {
if (SERVICES.isEmpty()) {
return;
}
// 開啟ServerSocket,埠3333,監聽消費者發起的請求,
ServerSocket serverSocket = new ServerSocket(3333);
while (true) {
// 當有請求到達,提交一個任務到執行緒池
Socket socket = serverSocket.accept();
EXECUTOR_SERVICE.submit(() -> {
try {
// 從網路IO中讀取消費者發送的引數
Object o = new ObjectInputStream(socket.getInputStream()).readObject();
if (o instanceof RpcMessage) {
RpcMessage message = (RpcMessage) o;
// 找到消費者要呼叫的服務
Provider provider = SERVICES.get(message.getInterfaceName());
if (provider == null) {
return;
}
// 利用反射呼叫服務
Object result = provider.invoke(message.getMethodName(), message.getArgsType(), message.getArgs());
OutputStream outputStream = socket.getOutputStream();
// 將回傳結果序列化為位元組陣列并通過Socket寫回
outputStream.write(ObjectUtil.serialize(result));
outputStream.flush();
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
6、生成RPC動態代理物件
/**
* @author: pch
* @description: 基于JDK動態代理生成代理物件,發起RPC呼叫
* @date: 2020/10/13
**/
public class RpcProxy implements InvocationHandler {
private Object origin = new Object();
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(origin, args);
}
// 開啟一個Socket
Socket socket = new Socket("127.0.0.1", 3333);
// 封裝請求協議
RpcMessage message = new RpcMessage();
message.setInterfaceName(method.getDeclaringClass().getName());
message.setMethodName(method.getName());
message.setArgsType(method.getParameterTypes());
message.setArgs(args);
// 將請求引數序列化成位元組陣列通過網路IO寫回
OutputStream outputStream = socket.getOutputStream();
outputStream.write(ObjectUtil.serialize(message));
outputStream.flush();
// 阻塞,等待服務端處理完畢回傳結果
Object o = new ObjectInputStream(socket.getInputStream()).readObject();
// 回傳給呼叫者
return o;
}
}
7、消費者注入RPC動態代理物件
/**
* @author: pch
* @description: 注入加了@MyRpcReference注解的屬性
* @date: 2020/10/13
**/
@Component
public class RpcBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> beanClass = bean.getClass();
Field[] fields = ClassUtil.getDeclaredFields(beanClass);
for (Field field : fields) {
if (field.getAnnotation(MyRpcReference.class) == null) {
continue;
}
Object proxy = Proxy.newProxyInstance(beanClass.getClassLoader(), new Class[]{field.getType()}, new RpcProxy());
field.setAccessible(true);
try {
field.set(bean, proxy);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
return bean;
}
}
功能測驗
核心代碼寫好了,那就可以開始測驗功能是否符合預期了,
1、啟動服務提供者

2、啟動消費者,并發起一個請求

尾巴
基于篇幅原因,本文只是實作了RPC最基本最簡單的功能,主要是理解RPC的思想,
當然,還有很多可以優化的點:
- Service暴露的所有方法快取起來,每次呼叫再反射查找開銷還是很大的,
- 使用Netty提升網路IO的通信性能,
- 連接池的引入,
- 注冊中心的加入,
- 寫回的資料沒有包裝協議,
- 資料格式的擴展,請求頭的加入,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/184849.html
標籤:其他
