rpc的原理:
將api序列化為字串,作為method_type,呼叫Id(流水號),api的引數和回傳結果作為args,作為整體再序列化為json,protobuf、cob等編碼方式,通過tcp、http、傳給服務端,
服務端進行解包,查找注冊表,查找api并傳參,結果通過網路回傳給客戶端,
rpc各部分邏輯功能:
- Application是方法注冊、呼叫
- client stub是注冊表,也就是k-v,例如 “func add(a, b int) int”:object.add()
- Client Runtime Library是session管理
- Transport是資料編碼(json、protobuf)、標準的網路傳輸層協議打包

代碼
server端:
package main
import (
"fmt"
"log"
"net"
"net/http"
"net/rpc"
"time"
)
type Req struct {
Data []byte
}
type Reply struct {
Success string
}
type DiskQueue int
func (t *DiskQueue) Put(args *Req, Reply *Reply) error {
fmt.Println(time.Now(), string(args.Data))
Reply.Success = "yes"
return nil
}
func main() {
arith := new(DiskQueue)
rpc.Register(arith)
rpc.HandleHTTP()
l, e := net.Listen("tcp", ":1234")
if e != nil {
log.Fatal("listen error:", e)
}
http.Serve(l, nil)
}
client端:
package main
import (
"errors"
"fmt"
"net/rpc"
"sync"
"time"
"github.com/gwaylib/log"
)
type Req struct {
Data []byte
}
type Reply struct {
Success string
}
// 同步呼叫rpc
func PostRpc(data []byte) error {
if rpcc == nil {
return errors.New("rpc client is nil")
}
req := Req{
Data: data,
}
var res Reply
err := rpcc.Call("DiskQueue.Put", req, &res)
if err != nil {
log.Error("rpc --------------------Err -------", err)
return err
}
log.Info("----------", res.Success)
return nil
}
var msgs = make(chan []byte, 1024)
var rpcc *rpc.Client
func init() {
conn, err := rpc.DialHTTP("tcp", "127.0.0.1:1234")
if err == nil {
rpcc = conn
}
}
func SendMsgLoop() {
wg.Add(1)
for {
select {
case msg := <-msgs:
err := PostRpc(msg)
if err != nil {
Waterlevel := 8
Threshold := 1 << 15
for {
if rpcc != nil {
rpcc.Close()
}
fmt.Println("reconnect rpc server ...>>>")
time.Sleep(time.Millisecond * time.Duration(Waterlevel))
conn, err := rpc.DialHTTP("tcp", "127.0.0.1:1234")
if err == nil {
rpcc = conn
break
} else {
fmt.Printf("Waterlevel=%+v, Threshold=%+v\n", Waterlevel, Threshold)
if Waterlevel < Threshold {
Waterlevel = Waterlevel << 1
} else if Waterlevel >= Threshold {
fmt.Println("超過最大次數,跳出本次rpc重連")
break
}
}
}
}
time.Sleep(time.Millisecond * 10)
}
}
defer wg.Done()
}
// 如果滿,則丟棄一個舊的訊息
func SendMsg(msg []byte) {
if len(msgs) > 1000 {
<-msgs
}
msgs <- msg
}
// 向訊息佇列發訊息
func PutMsgLoop() {
wg.Add(1)
for i := 0; i < 2000000; i++ {
time.Sleep(time.Millisecond * 10) // 延時,以便觀察斷開server, 再開啟server后,查看訊息是否丟棄舊訊息
SendMsg([]byte(fmt.Sprintf("hello, %d", i)))
}
defer wg.Done()
}
var wg sync.WaitGroup
func main() {
go PutMsgLoop() // 訊息生產測驗,20萬條訊息
go SendMsgLoop() //單獨的go routine 執行rpc 客戶端監聽訊息佇列,有訊息則發送,失敗重連,超時則丟這條訊息
time.Sleep(time.Second * 2)
wg.Wait()
}
運行效果:

轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/274789.html
標籤:區塊鏈
上一篇:聚焦 | TJWallet數字硬體錢包閃耀CITE2021
下一篇:《區塊鏈 基礎知識25講》筆記
