歡迎訪問我的GitHub
https://github.com/zq2599/blog_demos
內容:所有原創文章分類匯總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;
本篇概覽
- 本文是《java版gRPC實戰》系列的第五篇,目標是掌握雙向流型別的服務,即請求引數是流的形式,回應的內容也是流的形式;
- 先來看看官方資料對雙向流式RPC的介紹:是雙方使用讀寫流去發送一個訊息序列,兩個流獨立操作,因此客戶端和服務器 可以以任意喜歡的順序讀寫:比如, 服務器可以在寫入回應前等待接收所有的客戶端訊息,或者可以交替 的讀取和寫入訊息,或者其他讀寫的組合, 每個流中的訊息順序被預留;
- 掌握了客戶端流和服務端流兩種型別的開發后,雙向流型別就很好理解了,就是之前兩種型別的結合體,請求和回應都按照流的方式處理即可;
- 今天的實戰,咱們來設計一個在線商城的功能:批量減扣庫存,即客戶端提交多個商品和數量,服務端回傳每個商品減扣庫存成功和失敗的情況;
- 咱們盡快進入編碼環節吧,具體內容如下:
- 在proto檔案中定義雙向流型別的gRPC介面,再通過proto生成java代碼
- 開發服務端應用
- 開發客戶端應用
- 驗證
原始碼下載
- 本篇實戰中的完整原始碼可在GitHub下載到,地址和鏈接資訊如下表所示(https://github.com/zq2599/blog_demos):
| 名稱 | 鏈接 | 備注 |
|---|---|---|
| 專案主頁 | https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 |
| git倉庫地址(https) | https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 |
| git倉庫地址(ssh) | [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 |
- 這個git專案中有多個檔案夾,《java版gRPC實戰》系列的原始碼在grpc-tutorials檔案夾下,如下圖紅框所示:

- grpc-tutorials檔案夾下有多個目錄,本篇文章對應的服務端代碼在double-stream-server-side目錄下,客戶端代碼在double-stream-client-side目錄下,如下圖:

在proto檔案中定義雙向流型別的gRPC介面
- 首先要做的就是定義gRPC介面,打開mall.proto,在里面新增方法和相關的資料結構,需要重點關注的是BatchDeduct方法的入參ProductOrder和回傳值DeductReply都添加了stream修飾(ProductOrder是上一章定義的),代表該方法是雙向流型別:
// gRPC服務,這是個在線商城的庫存服務
service StockService {
// 雙向流式:批量扣減庫存
rpc BatchDeduct (stream ProductOrder) returns (stream DeductReply) {}
}
// 扣減庫存回傳結果的資料結構
message DeductReply {
// 回傳碼
int32 code = 1;
// 描述資訊
string message = 2;
}
- 雙擊下圖紅框中的task即可生成java代碼:

- 生成下圖紅框中的檔案,即服務端定義和回傳值資料結構:

- 接下來開發服務端;
開發服務端應用
- 在父工程grpc-turtorials下面新建名為double-stream-server-side的模塊,其build.gradle內容如下:
// 使用springboot插件
plugins {
id 'org.springframework.boot'
}
dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
// 作為gRPC服務提供方,需要用到此庫
implementation 'net.devh:grpc-server-spring-boot-starter'
// 依賴自動生成原始碼的工程
implementation project(':grpc-lib')
// annotationProcessor不會傳遞,使用了lombok生成代碼的模塊,需要自己宣告annotationProcessor
annotationProcessor 'org.projectlombok:lombok'
}
- 組態檔application.yml:
spring:
application:
name: double-stream-server-side
# gRPC有關的配置,這里只需要配置服務埠號
grpc:
server:
port: 9901
- 啟動類DoubleStreamServerSideApplication.java的代碼就不貼了,普通的springboot啟動類而已;
- 重點是提供grpc服務的GrpcServerService.java,咱們要做的就是給上層框架回傳一個匿名類,至于里面的onNext、onCompleted方法何時被呼叫是上層框架決定的,另外還準備了成員變數totalCount,這樣就可以記錄總數了,由于請求引數是流,因此匿名類的onNext會被多次呼叫,并且由于回傳值是流,因此onNext中呼叫了responseObserver.onNext方法來回應流中的每個請求,這樣客戶端就不斷收到服務端的回應資料(即客戶端的onNext方法會被多次呼叫):
package grpctutorials;
import com.bolingcavalry.grpctutorials.lib.DeductReply;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;
@GrpcService
@Slf4j
public class GrpcServerService extends StockServiceGrpc.StockServiceImplBase {
@Override
public StreamObserver<ProductOrder> batchDeduct(StreamObserver<DeductReply> responseObserver) {
// 回傳匿名類,給上層框架使用
return new StreamObserver<ProductOrder>() {
private int totalCount = 0;
@Override
public void onNext(ProductOrder value) {
log.info("正在處理商品[{}],數量為[{}]",
value.getProductId(),
value.getNumber());
// 增加總量
totalCount += value.getNumber();
int code;
String message;
// 假設單數的都有庫存不足的問題
if (0 == value.getNumber() % 2) {
code = 10000;
message = String.format("商品[%d]扣減庫存數[%d]成功", value.getProductId(), value.getNumber());
} else {
code = 10001;
message = String.format("商品[%d]扣減庫存數[%d]失敗", value.getProductId(), value.getNumber());
}
responseObserver.onNext(DeductReply.newBuilder()
.setCode(code)
.setMessage(message)
.build());
}
@Override
public void one rror(Throwable t) {
log.error("批量減扣庫存例外", t);
}
@Override
public void onCompleted() {
log.info("批量減扣庫存完成,共計[{}]件商品", totalCount);
responseObserver.onCompleted();
}
};
}
}
開發客戶端應用
- 在父工程grpc-turtorials下面新建名為double-stream-server-side的模塊,其build.gradle內容如下:
plugins {
id 'org.springframework.boot'
}
dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'net.devh:grpc-client-spring-boot-starter'
implementation project(':grpc-lib')
}
- 組態檔application.yml,設定自己的web埠號和服務端地址:
server:
port: 8082
spring:
application:
name: double-stream-client-side
grpc:
client:
# gRPC配置的名字,GrpcClient注解會用到
double-stream-server-side:
# gRPC服務端地址
address: 'static://127.0.0.1:9901'
enableKeepAlive: true
keepAliveWithoutCalls: true
negotiationType: plaintext
-
啟動類DoubleStreamClientSideApplication.java的代碼就不貼了,普通的springboot啟動類而已;
-
正常情況下我們都是用StreamObserver處理服務端回應,這里由于是異步回應,需要額外的方法從StreamObserver中取出業務資料,于是定一個新介面,繼承自StreamObserver,新增getExtra方法可以回傳String物件,詳細的用法稍后會看到:
package com.bolingcavalry.grpctutorials;
import io.grpc.stub.StreamObserver;
public interface ExtendResponseObserver<T> extends StreamObserver<T> {
String getExtra();
}
- 重頭戲來了,看看如何遠程呼叫雙向流型別的gRPC介面,代碼中已經添加詳細注釋:
package grpctutorials;
import com.bolingcavalry.grpctutorials.lib.DeductReply;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class GrpcClientService {
@GrpcClient("double-stream-server-side")
private StockServiceGrpc.StockServiceStub stockServiceStub;
/**
* 批量減庫存
* @param count
* @return
*/
public String batchDeduct(int count) {
CountDownLatch countDownLatch = new CountDownLatch(1);
// responseObserver的onNext和onCompleted會在另一個執行緒中被執行,
// ExtendResponseObserver繼承自StreamObserver
ExtendResponseObserver<DeductReply> responseObserver = new ExtendResponseObserver<DeductReply>() {
// 用stringBuilder保存所有來自服務端的回應
private StringBuilder stringBuilder = new StringBuilder();
@Override
public String getExtra() {
return stringBuilder.toString();
}
/**
* 客戶端的流式請求期間,每一筆請求都會收到服務端的一個回應,
* 對應每個回應,這里的onNext方法都會被執行一次,入參是回應內容
* @param value
*/
@Override
public void onNext(DeductReply value) {
log.info("batch deduct on next");
// 放入匿名類的成員變數中
stringBuilder.append(String.format("回傳碼[%d],回傳資訊:%s<br>" , value.getCode(), value.getMessage()));
}
@Override
public void one rror(Throwable t) {
log.error("batch deduct gRPC request error", t);
stringBuilder.append("batch deduct gRPC error, " + t.getMessage());
countDownLatch.countDown();
}
/**
* 服務端確認回應完成后,這里的onCompleted方法會被呼叫
*/
@Override
public void onCompleted() {
log.info("batch deduct on complete");
// 執行了countDown方法后,前面執行countDownLatch.await方法的執行緒就不再wait了,
// 會繼續往下執行
countDownLatch.countDown();
}
};
// 遠程呼叫,此時資料還沒有給到服務端
StreamObserver<ProductOrder> requestObserver = stockServiceStub.batchDeduct(responseObserver);
for(int i=0; i<count; i++) {
// 每次執行onNext都會發送一筆資料到服務端,
// 服務端的onNext方法都會被執行一次
requestObserver.onNext(build(101 + i, 1 + i));
}
// 客戶端告訴服務端:資料已經發完了
requestObserver.onCompleted();
try {
// 開始等待,如果服務端處理完成,那么responseObserver的onCompleted方法會在另一個執行緒被執行,
// 那里會執行countDownLatch的countDown方法,一但countDown被執行,下面的await就執行完畢了,
// await的超時時間設定為2秒
countDownLatch.await(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("countDownLatch await error", e);
}
log.info("service finish");
// 服務端回傳的內容被放置在requestObserver中,從getExtra方法可以取得
return responseObserver.getExtra();
}
/**
* 創建ProductOrder物件
* @param productId
* @param num
* @return
*/
private static ProductOrder build(int productId, int num) {
return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();
}
}
- 最后做個web介面,可以通過web請求驗證遠程呼叫:
package grpctutorials;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class GrpcClientController {
@Autowired
private GrpcClientService grpcClientService;
@RequestMapping("/")
public String printMessage(@RequestParam(defaultValue = "https://www.cnblogs.com/bolingcavalry/p/1") int count) {
return grpcClientService.batchDeduct(count);
}
}
- 編碼完成,開始驗證;
驗證
- 啟動服務端DoubleStreamServerSideApplication:

- 啟動客戶端DoubleStreamClientSideApplication:

- 這里要改:瀏覽器輸入http://localhost:8083/?count=10,回應如下,可見遠程呼叫gRPC服務成功,流式回應的每一筆回傳都被客戶端收到:

- 下面是服務端日志,可見逐一處理了客戶端的每一筆資料:

- 下面是客戶端日志,可見由于CountDownLatch的作用,發起gRPC請求的執行緒一直等待responseObserver.onCompleted在另一個執行緒被執行完后,才會繼續執行:

- 至此,四種型別的gRPC服務及其客戶端開發就完成了,一般的業務場景咱們都能應付自如,接下來的文章咱們會繼續深入學習,了解復雜場景下的gRPC操作;
你不孤單,欣宸原創一路相伴
- Java系列
- Spring系列
- Docker系列
- kubernetes系列
- 資料庫+中間件系列
- DevOps系列
歡迎關注公眾號:程式員欣宸
微信搜索「程式員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/300843.html
標籤:Java
上一篇:面試官:Redis 單執行緒已經很快,為何 6.0要引入多執行緒?有啥優勢?
下一篇:JVM與GC調優《大廠學院》
