主頁 > 後端開發 > akka-grpc - 應用案例

akka-grpc - 應用案例

2020-09-10 01:56:29 後端開發

  上期說道:http/2還屬于一種不算普及的技術協議,可能目前只適合用于內部系統集成,現在開始大面積介入可能為時尚早,不過有些專案需求不等人,需要使用這項技術,所以研究了一下akka-grpc,寫了一篇介紹,本想到此為止,繼續其它專案,想想這樣做法有點不負責任,像是草草收場,畢竟用akka-grpc做了些事情,想想還是再寫這篇跟大家分享使用kka-grpc的程序,

我說過,了解akka-grpc的主要目的還是在protobuf的應用上,這是一種高效率的序列化協議,剛好,公司有這么個專案,是一個影像處理平臺:把很多圖片拍攝終端的影像傳上平臺進行商品識別、OCR等影像處理,由于終端數量多、影像處理又特別消耗記憶體、CPU等計算資源、又要求快速回應,所以第一考慮就是使用akka-cluster把影像處理任務分割到多個節點上并行處理,這里就需要仔細考慮圖片在終端到平臺、然后集群節點與點actor間的傳輸效率了,如何在akka系統里使用protobuf格式的資料正是本篇討論和示范的目的,

akka-grpc應用一般從IDL檔案里訊息型別和服務函式的定義開始,如下面這個.proto檔案示范:

syntax = "proto3";

import "google/protobuf/wrappers.proto";
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto";

option (scalapb.options) = {

  // don't append file name to package
  flat_package: true

  // generate one Scala file for all messages (services still get their own file)
  single_file: true

  // add imports to generated file
  // useful when extending traits or using custom types
  // import: "io.ontherocks.hellogrpc.RockingMessage"

  // code to put at the top of generated file
  // works only with `single_file: true`
  //preamble: "sealed trait SomeSealedTrait"
};


package com.datatech.pos.abs;

message UCredential {
  string userid = 1;
  string password = 2;
}

message JWToken {
  string jwt = 1;
}

message Picture {
  int32 num = 1;
  bytes blob = 2;
}
message Capture {
  string ean = 1;
  bytes cover1 = 2;
  bytes cover2 = 3;
}

message Book {
  string ean = 1;
  string ver = 2;
  string isbn = 3;
  string title = 4;
  string publisher = 5;
  double price = 6;
  bytes cover1 = 7;
  bytes cover2 = 8;
}

message QueryResult {
  int32  sts         = 1;
  string msg         = 2;
  Book bookinfo   = 3;
}

service Services {
  rpc GetAuthToken(UCredential) returns (JWToken) {};
  rpc SavePicture(Picture) returns (QueryResult) {};
  rpc GetPicture(Picture) returns (Picture) {};
//  rpc SaveCapture(Capture) returns (QueryResult) {};
//  rpc GetCapture(Capture) returns (Capture) {};
//  rpc GetBookInfo(Capture) returns (QueryResult) {};
}

因為這次示范針對的是protobuf的使用,所以就揀了SavePicture,GetPicture這兩項服務函式,JWToken只是用戶身份憑證,集群分片shard-entityId是以用戶憑證為基礎的,所以平臺需要通過JWT進行跨節點任務指派以實作分布式影像處理運算,

下面就要在編譯器插件自動產生的基礎服務介面代碼基礎上進行具體的服務功能實作,這部分主要是對介面函式的實作(oveerride):

class gRPCServices(trace: Boolean, system: ActorSystem, sharding: ClusterSharding)(
  implicit  waitResponseTimeout: Timeout, authenticator: AuthBase) extends ServicesPowerApi with LogSupport {
  implicit val ec = system.dispatcher
  log.stepOn = trace
  override def getAuthToken(request: UCredential, meta: Metadata): Future[JWToken] = {
    val entityRef = sharding.entityRefFor(Authenticator.EntityKey, UUID.randomUUID.toString)
    val jwtResp = for {
      ui <- entityRef.ask[Authenticator.Response](Authenticator.GetUserInfo(request.userid, _))
        .map {
          case Authenticator.UserInfo(info) => info
          case _ => Map[String, Any]()
        }
      jwt <- entityRef.ask[Authenticator.Response](Authenticator.GetToken(ui, _))
    } yield jwt

    jwtResp.map {
      case Authenticator.JWToken(jwt) =>
        if (jwt.nonEmpty) JWToken(jwt)
        else throw new Exception("身份驗證失敗!無法提供憑證,")
      case _ => throw new Exception("身份驗證失敗!無法提供憑證,")
    }
  }
  override def savePicture(in: Picture, metadata: Metadata): Future[QueryResult] = {
    val jwt = getJwt(metadata).getOrElse("")
    val ids = authenticator.shopIdFromJwt(jwt).getOrElse(("","","","",""))
    val (shopId, posId, termId, impurl,devId) = ids
    val entityRef = sharding.entityRefFor(ImgProcessor.EntityKey, s"$shopId:$posId")
    val futResp = entityRef.ask[ImgProcessor.Response](ImgProcessor.SaveImage(in, _))
      .map {
        case ImgProcessor.ValidImgPro(img) => QueryResult(sts = 0, msg = "picture saved.")
        case ImgProcessor.FailedImgPro(msg) => QueryResult(sts = -1, msg = msg)
      }
    futResp
  }

  override def getPicture(in: Picture, metadata: Metadata): Future[Picture] = {
    val jwt = getJwt(metadata).getOrElse("")
    val ids = authenticator.shopIdFromJwt(jwt).getOrElse(("","","","",""))
    val (shopId, posId, termId, impurl,devId) = ids
    val entityRef = sharding.entityRefFor(ImgProcessor.EntityKey, s"$shopId:$posId")
    val futResp = entityRef.ask[ImgProcessor.Response](ImgProcessor.GetImage(in.num, _))
      .map {
        case ImgProcessor.ValidImgPro(img) => img
        case ImgProcessor.FailedImgPro(msg) => Picture(-1, ByteString.EMPTY)
      }
    futResp
  }

  def getJwt(metadata: Metadata): Option[String] = {
    metadata.getText("bearer")
  }
}

由于是通過PowerApi模式產生的介面代碼,所以介面函式都帶有MetaData引數,代表HttpRequest header集合,可以看到:服務函式實作都是通過entityRef,一個分片調度器分配到集群某個節點ImgProcessor.EntityKey型別的entity-actor上進行的,shopId:posId就是代表為某用戶構建的entityId,這個是通過用戶在Request中提供的MetaData引數中jwt決議得出的,

可以看到,具體服務提供是通過集群的分片實作的,下面是這個分片的代碼示范:

      log.step(s"initializing sharding for ${ImgProcessor.EntityKey} ...")(MachineId("",""))
      val imgEntityType = Entity(ImgProcessor.EntityKey) { entityContext =>
        ImgProcessor(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive)
      }.withStopMessage(ImgProcessor.StopWorker)
      sharding.init(imgEntityType)

上面imgEntityType就是shard-entity型別,其實就是按用戶提供的jwt在任意集群節點上實時構建的一個opencv影像處理器,下面是這個entity-actor的示范代碼:

object ImgProcessor extends LogSupport {
  sealed trait Command extends CborSerializable
  case class SaveImage(img: Picture, replyTo: ActorRef[Response]) extends Command
  case class GetImage(imgnum: Int,replyTo: ActorRef[Response]) extends Command

  sealed trait Response extends CborSerializable
  case class ValidImgPro(img: Picture) extends Response
  case class FailedImgPro(msg: String) extends Response

  def apply(shard: ActorRef[ClusterSharding.ShardCommand],mgoHosts: List[String], entityId: String, trace: Boolean, keepAlive: FiniteDuration): Behavior[Command] = {
    val (shopId,posId) = entityId.split(':').toList match {
      case sid::pid::Nil  => (sid,pid) }
    implicit val loc = Messages.MachineId(shopId,posId)
    log.stepOn = trace

    Behaviors.setup[Command] { ctx =>
      implicit val ec = ctx.executionContext
      ctx.setReceiveTimeout(keepAlive, Idle)
      Behaviors.withTimers[Command] { timer =>
        Behaviors.receiveMessage[Command] {
          case SaveImage(img, replyTo) =>
            log.step(s"ImgProcessor: SaveImage(${img.num})")
            implicit val client = mongoClient(mgoHosts)
            maybeMgoClient = Some(client)
            ctx.pipeToSelf(savePicture(img)) {
              case Success(_) => {
                  replyTo ! ValidImgPro(img)
                  Done(loc.shopid, loc.posid, s"saved image #${img.num}.")
              }
              case Failure(err) =>
                log.error(s"ImgProcessor: SaveImage Error: ${err.getMessage}")
                replyTo ! FailedImgPro(err.getMessage)
                Done(loc.shopid, loc.posid, s"SaveImage with error: ${err.getMessage}")
            }
            Behaviors.same
          case GetImage(imgnum, replyTo) =>
...

  }

}

整個圖片傳輸是通過actor的訊息實作的,akka訊息支持多種序列化格式,包括protobuf, 在組態檔.conf里定義:

akka {
  loglevel = INFO
  actor {
    provider = cluster
    serializers {
      jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer"
      proto = "akka.remote.serialization.ProtobufSerializer"
    }
    serialization-bindings {
      "com.datatech.pos.abs.CborSerializable" = jackson-cbor
      "scalapb.GeneratedMessage" = proto
    }
  }
}

grpc server 基本上是個標準模塊,不同的只是service引數:

class gRPCServer(host: String, port: Int) extends LogSupport {
  def runServer(system: ActorSystem[_], service: gRPCServices): Future[Http.ServerBinding] = {
    implicit val classic = system.toClassic
    implicit val ec: ExecutionContext = system.executionContext

    // Create service handlers
    val serviceHandler: HttpRequest => Future[HttpResponse] =
      ServicesPowerApiHandler(service)

    // Bind service handler servers to localhost:8080/8081
    val binding = Http().bindAndHandleAsync(
      serviceHandler,
      interface = host,
      port = port,
      connectionContext = HttpConnectionContext())

    // report successful binding
    binding.foreach { binding => println(s"******* startup gRPC-server on: port = $port  *******") }

    binding

    //#server
  }
}

下面是客戶端測驗代碼:

object gRPCTestClient {

  def main(args: Array[String]): Unit = {
    val config_onenode = ConfigFactory.load("onenode")
    implicit val sys = ActorSystem("grpc-client", config_onenode)
    implicit val ec = sys.dispatcher
    val clientSettings = GrpcClientSettings.fromConfig(Services.name)
    //   val clientSettings = GrpcClientSettings.connectToServiceAt("192.168.11.189", 50052);
    implicit val client = ServicesClient(clientSettings)

    val futJwt = client.getAuthToken(UCredential("9013", "123456"))
    val jwt = Await.result(futJwt, 5.seconds).jwt
    println(s"got jwt: ${jwt}")
    scala.io.StdIn.readLine()

    val bytes = FileStreaming.FileToByteArray("books/59c10d099b26e.jpg")
    val mat = bytesToMat(bytes)
    show(mat,"sent picture")
    scala.io.StdIn.readLine()

    val picture = Picture(111,marshal(bytes))

    val futQR = client.savePicture().addHeader("Bearer", jwt).invoke(Picture(111,marshal(bytes)))
    futQR.onComplete {
      case Success(qr) => println(s"Saving Success: ${qr.msg}")
      case Failure(err) => println(s"Saving Error: ${err.getMessage}")
    }

    scala.io.StdIn.readLine()

    val futPic = client.getPicture().addHeader("Bearer", jwt).invoke(Picture(111,ByteString.EMPTY))
    futPic.onComplete {
      case Success(pic) =>
        val image = bytesToMat(unmarshal(pic.blob))
        show(image, s"picture:${pic.num}")
      case Failure(err) => println(s"Reading Error: ${err.getMessage}")
    }

    scala.io.StdIn.readLine()

    sys.terminate()
  }
}

基本流程是:先通過getAuthToken獲取jwt;在呼叫服務時通過addHeader("bearer",jwt)把jwt隨著函式呼叫一起提交給服務端,

客戶端設定可以在組態檔中定義:

akka {
  loglevel = INFO

  grpc.client {
    "com.datatech.pos.abs.Services" {
      host = 192.168.11.189
      port = 52051
      override-authority = foo.test.google.fr
      use-tls = false
    }
  }

}

 

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/426.html

標籤:Scala

上一篇:akka-grpc - 基于akka-http和akka-streams的scala gRPC開發工具

下一篇:Java/Scala 中三種特殊型別的值

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more