背景
專案基于sofa jraft構建,順便使用了其自帶的rpc服務,協議使用protobuf,使用jraft創建一個rpc服務RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint),并
新增的rpc介面,這通常需要定義自己的Processor并繼承com.alipay.sofa.jraft.rpc.RpcRequestProcessor,然后創建一個實體,使用rpcServer.registerProcessor將實體暴露的rpc注冊到RpcServer中,這里的待改善問題是當我們的介面變多時,Processor并不容易管理,同時個人認為,定義Processor的程序是繁瑣和枯燥的,幾乎都是一個模板,而我很懶哈哈,不想一個個寫,除了業務,不想來回寫,下面看看怎么簡化這個流程吧,
下面使用bitlap的一個創建會話的Processor來說明
// networkService是業務邏輯所在類
class OpenSessionProcessor(private val networkService: NetworkService, executor: Executor = null)
extends BitlapRpcProcessor[BOpenSessionReq](executor, BOpenSessionResp.getDefaultInstance) { // BitlapRpcProcessor將RpcRequestProcessor的handleRequest方法處理分為正常處理processRequest和processError例外處理
override def processRequest(request: BOpenSessionReq, done: RpcRequestClosure): Message = {
import scala.jdk.CollectionConverters.MapHasAsScala
val username = request.getUsername
val password = request.getPassword
val configurationMap = request.getConfigurationMap
val sessionHandle = networkService.openSession(username, password,
configurationMap.asScala.toMap)
BOpenSessionResp.newBuilder()
.setSessionHandle(sessionHandle.toBSessionHandle())
.setStatus(success()).build()
}
override def processError(rpcCtx: RpcContext, exception: Exception): Message = {
BOpenSessionResp.newBuilder().setStatus(error(exception)).build()
}
override def interest(): String = classOf[BOpenSessionReq].getName
}
這里面其實沒什么代碼,主要是代碼在networkService中,但是這里需要創建OpenSessionProcessor這個類,算是個模板類,這里雖然使用BitlapRpcProcessor做了一次優化,但是效果一般,(畢竟我是個懶人,能少寫一行代碼都是好的,哈哈)
BitlapRpcProcessor的定義如下:
abstract class BitlapRpcProcessor[T <: Message](executor: Executor, override val defaultResp: Message)
extends RpcRequestProcessor[T](executor, defaultResp)
with ProcessorHelper with LazyLogging {
override def handleRequest(rpcCtx: RpcContext, request: T) {
try {
val msg = processRequest(request, new RpcRequestClosure(rpcCtx, this.defaultResp))
if (msg != null) {
rpcCtx.sendResponse(msg)
}
} catch {
case e: Exception =>
logger.error(s"handleRequest $request failed", e)
rpcCtx.sendResponse(processError(rpcCtx, e))
}
}
def processError(rpcCtx: RpcContext, exception: Exception): Message
}
使用宏方法進行優化
根據如上所示,我們的目的是通過一種方法宏的處理,在不創建新的類檔案的情況下創建com.alipay.sofa.jraft.rpc.RpcRequestProcessor的實體,這個宏定義為Processable
考慮:
- 泛型、型別安全
- 業務處理
- 自定義拓展
一般Processor是使用RpcRequestProcessor的建構式派生子類,這里的2個建構式分別是執行請求的和protobuf Message型別的回應訊息的默認實體
public RpcRequestProcessor(Executor executor, Message defaultResp) {
super();
this.executor = executor;
this.defaultResp = defaultResp;
}
Processable宏的初步設計
下面使用黑盒宏來實作,
宏的定義
def apply[Req <: Message, Service, Executor <: java.util.concurrent.Executor]
(service: Service, defaultResp: Message, executor: Executor)
(
processRequest: (Service, RpcRequestClosure, Req) ? Message,
processException: (Service, RpcContext, Exception) ? Message): CustomRpcProcessor[Req]
= macro ProcessableMacro.processorImpl[Req, Service, Executor]
泛型說明:
Reqprotobuf定義的型別,用于request的訊息型別,必須是com.google.protobuf.Message的子類,Service用戶自定義的服務介面,用于處理業務邏輯,可以為任意型別,Executor用于傳遞給RpcRequestProcessor的建構式,必須是java.util.concurrent.Executor的子類,
引數說明:
processRequest: (Service, RpcRequestClosure, Req) ? Message一個處理請求的函式,可以實作任意業務邏輯,最重要的引數,processException: (Service, RpcContext, Exception) ? Message一個處理例外的函式,service: Service操作業務所需要的實體物件,defaultResp: Messageprotobuf定義的型別的默認實體,用于傳遞給RpcRequestProcessor的建構式,executor: Executor用于傳遞給RpcRequestProcessor的建構式,必須是java.util.concurrent.Executor的子類,
回傳的
Message通常是自己定義的用于回應的protobuf物件的子類
初步的設計照搬了OpenSessionProcessor的實作,只是使用宏創建類的實體物件,所以乍一看引數很多,不便使用,
考慮到大多數情況下并不需要這么靈活的定義,還是可以再簡化一下宏定義的,先看protobuf例子,
示例
對于現有protobuf檔案:
message BOpenSession {
message BOpenSessionReq {
string username = 1;
string password = 2;
map<string, string> configuration = 3;
}
message BOpenSessionResp {
string status = 1;
map<string, string> configuration = 2;
string session_handle = 3;
}
}
使用Processable宏:
val openSession = Processable[BOpenSessionReq, NetService, Executor](
(service, rpcRequestClosure, req) => {
import scala.jdk.CollectionConverters.MapHasAsScala
val username = req.getUsername
val password = req.getPassword
val configurationMap = req.getConfigurationMap
val ret = service.openSession(username, password, configurationMap.asScala.toMap)
BOpenSessionResp.newBuilder().setSessionHandle(ret).build()
},
(service, rpcContext, exception) => {
BOpenSessionResp.newBuilder().setStatus(exception.getLocalizedMessage).build()
},
new NetService, BOpenSessionResp.getDefaultInstance, null
)
Processable宏的改進 一
本次改進不是為了拓展,而是為了在一般情況下,宏方法容易使用,目標當然是減少引數傳遞,但是哪些引數可以減少呢?
下面列了2個引數一般都是默認值,所以就可以簡化它,需要注意,這里簡化并不是就不支持傳遞這些引數了,因為Scala object的apply方法是能多載的,所以是共存的,
為什么我們要使用apply方法?object的apply能使得我們使用Processable[T](xx)的形式來呼叫,而不需要Processable[T].toProcessor(xx),是不是更清爽了,哈哈,
宏的定義
def apply[Service, Req <: Message, Resp <: Message](service: Service)
(
processRequest: (Service, RpcRequestClosure, Req) ? Message,
processException: (Service, RpcContext, Exception) ? Message): CustomRpcProcessor[Req] =
macro ProcessableMacro.processorWithDefaultRespImpl[Service, Req, Resp]
executor直接使用null,不支持傳入自定義引數,defaultResp直接使用Resp.getDefaultInstance創建默認物件 ,不支持傳入自定義引數,
與第一次定義很類似,僅是省略了executor和defaultResp引數,但是泛型引數都保留了,這是為了型別安全,這次由于沒有傳defaultResp,所以需要使用泛型Resp指定默認值的型別,其實內部仍是是使用了getDefaultInstance,這里也能觀察到,靈活性和便捷性是不可都得的,
示例
val openSession = Processable[NetService, BOpenSessionReq, BOpenSessionResp](new NetService)(
(service, _, req) => {
import scala.jdk.CollectionConverters.MapHasAsScala
val username = req.getUsername
val password = req.getPassword
val configurationMap = req.getConfigurationMap
val ret = service.openSession(username, password, configurationMap.asScala.toMap)
BOpenSessionResp.newBuilder().setSessionHandle(ret).build()
},
(_, _, exception) => {
BOpenSessionResp.newBuilder().setStatus(exception.getLocalizedMessage).build()
}
)
Processable宏的改進 二
到上面為止,其實差不多可以了,再次簡化就只能是連service都不傳了,可以做到嗎?答案是肯定的,我們可以用運行時反射來創建物件,
雖然之前都做的是編譯期反射,這次結合編譯期和運行期來看看具體應用,
- 為
Service泛型反射出物件,不再需要傳service引數 - 僅支持非抽象類且必須含有默認無參建構式
Scala如何反射一個類來創建物件呢?
我們定義一個Creator,通過引數T:WeakTypeTag反射,WeakTypeTag由編譯器創建,使用T的tpe屬性可以反射T,
WeakTypeTag力求盡可能是具體的型別,即如果TypeTag可用于參考的型別引數或抽象型別,則它們用于將具體型別嵌入WeakTypeTag,
否則WeakTypeTag將包含對抽象型別的參考,當人們期望T可能是部分抽象的,但需要特別小心來處理這種情況時,這種行為是有用的,
但是,如果T應該是完全已知的,則應該使用TypeTag,它靜態地保證了這個屬性,TypeTag它不包含任何對未決議型別引數或抽象型別的參考,
Scala的抽象語法樹除了三個欄位外,是不可變的,這三個就是symbol,pos,tpe,對于編譯器而言,型別檢查不是一步到位的,所以pos,tpe,symbol這種屬性,可能在某階段是沒有值的,
而在typechecked后就能獲取到實際值,這在編譯期反射中很有用,
class Creator[T: WeakTypeTag] {
def createInstance(args: AnyRef*)(ctor: Int = 0): T = {
val tt = weakTypeTag[T]
currentMirror.reflectClass(tt.tpe.typeSymbol.asClass).reflectConstructor(
tt.tpe.members.filter(m =>
m.isMethod && m.asMethod.isConstructor
).iterator.toSeq(ctor).asMethod
)(args: _*).asInstanceOf[T]
}
}
有了反射功能,我們只需要將NetService傳入作為Service的型別,在宏中使用運行時反射構造物件即可,
// 這里即使與上面相比少了個service引數,但是因為編譯器識別時有點問題,會和上面那個多載的apply定義沖突,所以把泛型的位置改了下,把Service泛型放到最后,
val openSession = Processable[BOpenSessionReq, BOpenSessionResp, NetService](
(service, rpc, req) => {
import scala.jdk.CollectionConverters.MapHasAsScala
val username = req.getUsername
val password = req.getPassword
val configurationMap = req.getConfigurationMap
val ret = service.openSession(username, password, configurationMap.asScala.toMap)
BOpenSessionResp.newBuilder().setSessionHandle(ret).build()
},
(service, rpc, exception) => {
BOpenSessionResp.newBuilder().setStatus(exception.getLocalizedMessage).build()
}
)
到目前為止,再一般情況下,我們甚至只需要提供2個函式就能實作任意Processor的定義了,再也不用創建類了,哈哈,
宏的實作是比較難懂的,這里沒有貼代碼,感興趣的可以看看原始碼,https://github.com/jxnu-liguobin/scala-macro-tools/tree/master/src/main/scala/io/github/dreamylost/sofa,
如果對你有幫助可以點個star,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/374696.html
標籤:其他
上一篇:UDP服務器客戶端編程流程
