借助gRPC我們可以實作不同行程間通信模式(也稱RPC風格),
repeated 關鍵字
message Order {
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
}
使用repeated表明這個欄位在訊息中可以重復出現多次,包括0次,編譯成go,結構體會表示成一個切片,
一元RPC模式
01 初識gRPC,感受gRPC的強大魅力 - 小能日記 - 博客園
一元RPC模式也被稱為簡單RPC模式,在該模式中,當客戶端呼叫服務器端的遠程方法時,客戶端發送請求至服務器端并獲得一個回應,與回應一起發送的還有狀態細節以及trailer元資料,
rpc addOrder(Order) returns (google.protobuf.StringValue);
rpc getOrder(google.protobuf.StringValue) returns (Order);
編譯后
func (s *server) GetOrder(ctx context.Context, orderId *wrapper.StringValue) (*pb.Order, error) {
其中Context物件傳遞到方法中是因為其包含了一些用于控制gRPC行為的構造,比如截止時間和取消功能,
服務器端流RPC模式
服務端在接收到客戶端的請求訊息后,會發回一個回應的序列,這種多個回應所組成的序列也被稱為”流“,
在將所有的服務端回應發送完畢之后,服務端會以trailer元資料的形式將其狀態發送給客戶端,從而標記流的結束,
訂單服務的客戶端發出一個請求之后,會接收到多條回應訊息,
rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
通過使用 returns (stream Order) 將回傳引數指定為訂單的流,編譯后
func (s *server) SearchOrders(searchQuery *wrappers.StringValue, stream pb.OrderManagement_SearchOrdersServer) error {
for key, order := range orderMap {
for _, itemStr := range order.Items {
if strings.Contains(itemStr, searchQuery.Value) {
err := stream.Send(&order)
// 需要處理將訊息以流的形式發送給客戶端的程序中可能出現的錯誤
if err != nil {
return fmt.Errorf("error sending message to stream : %v", err)
}
log.Print("Matching Order Found : " + key)
break
}
}
}
return nil
}
pb.OrderManagement_SearchOrdersServer 是服務端流的寫入物件,可以寫入多個回應,
客戶端代碼使用Recv方法從客戶端流中檢索訊息,并且持續檢索,直到流結束為止,即 io.EOF
searchStream, _ := client.SearchOrders(ctx, &wrapper.StringValue{Value: "Google"})
for {
searchOrder, err := searchStream.Recv()
if err == io.EOF {
log.Print("EOF")
break
}
if err == nil {
log.Print("Search Result : ", searchOrder)
}
}
客戶端流RPC模式
客戶端會發送多個請求給服務端,服務端可以隨時結束接識訓接收所有訊息后再發送回應,
rpc updateOrders(stream Order) returns (google.protobuf.StringValue);
編譯為
func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
ordersStr := "Updated Order IDs : "
for {
order, err := stream.Recv()
if err == io.EOF {
// 客戶端已發送完畢,服務器可以回應
return stream.SendAndClose(&wrapper.StringValue{Value: "Orders processed " + ordersStr})
}
if err != nil {
return err
}
orderMap[order.Id] = *order
log.Printf("Order ID : %s - %s", order.Id, "Updated")
ordersStr += order.Id + ", "
}
}
pb.OrderManagement_UpdateOrdersServer是客戶端傳入訊息流的參考物件,
服務端呼叫該物件的SendAndClose方法可以發送回應,同時標記服務器端訊息終結了流,
客戶端呼叫物件的CloseAndRecv方法可以關閉流并接收回應,
updateStream, err := client.UpdateOrders(ctx)
if err != nil {
log.Fatalf("%v.UpdateOrders(_) = _, %v", client, err)
}
if err := updateStream.Send(&updOrder1); err != nil {
log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder1, err)
}
if err := updateStream.Send(&updOrder2); err != nil {
log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder2, err)
}
if err := updateStream.Send(&updOrder3); err != nil {
log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder3, err)
}
// 結束流并等待服務端回應
updateRes, err := updateStream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", updateStream, err, nil)
}
log.Printf("Update Orders Res : %s", updateRes)
雙向流模式
雙向流模式中,客戶端以訊息流的形式發送請求到服務端,服務端也以訊息流的形式進行回應,呼叫必須由客戶端發起,流的操作完全獨立,客戶端和服務端可以按照任意順序進行讀取和寫入,
rpc processOrders(stream google.protobuf.StringValue) returns (stream CombinedShipment);
一旦呼叫RPC方法,那么無論是客戶端還是服務端,都可以在任意時間發送訊息,這也包括來自任意一段的流結束標記,編譯后
func (s *server) ProcessOrders(stream pb.OrderManagement_ProcessOrdersServer) error {
...
}
pb.OrderManagement_ProcessOrdersServer 是客戶端和服務器端之間訊息流的物件參考,既可以Recv方法讀取,也可以Send方法寫入,
客戶端代碼中可開啟兩個執行緒分別用于發送訊息流和讀取訊息流,呼叫流參考物件的CloseSend方法可以關閉當前流并通知另一端,但另一端并未關閉,還可以發送資料,
...
streamProcOrder, err := client.ProcessOrders(ctx)
if err != nil {
log.Fatalf("%v.ProcessOrders(_) = _, %v", client, err)
}
if err := streamProcOrder.Send(&wrapper.StringValue{Value:"102"}); err != nil {
log.Fatalf("%v.Send(%v) = %v", client, "102", err)
}
if err := streamProcOrder.Send(&wrapper.StringValue{Value:"103"}); err != nil {
log.Fatalf("%v.Send(%v) = %v", client, "103", err)
}
if err := streamProcOrder.Send(&wrapper.StringValue{Value:"104"}); err != nil {
log.Fatalf("%v.Send(%v) = %v", client, "104", err)
}
channel := make(chan struct{})
go asncClientBidirectionalRPC(streamProcOrder, channel)
time.Sleep(time.Millisecond * 1000)
if err := streamProcOrder.Send(&wrapper.StringValue{Value:"101"}); err != nil {
log.Fatalf("%v.Send(%v) = %v", client, "101", err)
}
if err := streamProcOrder.CloseSend(); err != nil {
log.Fatal(err)
}
// 用channel保證main在讀取訊息流的go程結束后再結束
channel <- struct{}{}
}
func asncClientBidirectionalRPC(streamProcOrder pb.OrderManagement_ProcessOrdersClient, c chan struct{}) {
for {
combinedShipment, errProcOrder := streamProcOrder.Recv()
if errProcOrder == io.EOF {
break
}
log.Printf("Combined shipment : ", combinedShipment.OrdersList)
}
<-c
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/509115.html
標籤:其他
上一篇:【微服務】Nacos初體驗
下一篇:Redisson多策略注解限流
