akka 2.6.x正式發布以來已經有好一段時間了,核心變化是typed-actor的正式啟用,當然persistence,cluster等模塊也有較大變化,一開始從名稱估摸就是把傳統any型別的訊息改成強型別訊息,所以想拖一段時間看看到底能對我們現有基于akka-classic的應用軟體有什么深層次的影響,不過最近考慮的一些系統架構逼的我不得不立即開始akka-typed的調研,也就是說akka-classic已經無法或者很困難去實作新的系統架構,且聽我道來:最近在考慮一個微服務中臺,作為后臺資料服務呼叫的唯一入口,平臺應該是個分布式軟體,那么采用akka-cluster目前是唯一的選擇,畢竟前期搞過很多基于akka-cluster的應用軟體,但是,akka-cluster-sharding只能支持一種entity actor,畢竟,由于akka-classic的訊息是沒有型別的,只能在收到訊息后再通過型別模式匹配的方式確定應該運行的代碼,所以,這個actor必須包括所有的業務邏輯處理運算,也就是說對于一個大型應用來說這就是一塊巨型代碼,還有,如果涉及到維護actor狀態的話,比如persistenceActor,或者綜合型別業務運算,那么又需要多少種類的資料結構,又怎樣去維護、管理這些結構呢?對我來說這基本上是mission-impossible,實際上logom應該正符合這個中臺的要求:cluster-sharding, CQRS... 抱著一種好奇的心態了解了一下lagom原始碼,忽然恍然大悟:這個東西是基于akka-typed的!想想看也是:如果我們可以把actor和訊息型別綁在一起,那么我們就可以通過訊息型別對應到某種actor,也就是說基于akka-typed,我們可以把綜合性的業務劃分成多個actor模塊,然后我們可以指定那種actor做那些事情,當然,經過了功能細分,actor的設計也簡單了許多,現在這個新的中臺可以實作前臺應用直接呼叫對應的actor處理業務了,不用多想了,這注定就是akka應用的將來,還等什么呢?
先從一個最簡單的hello程式開始吧:基本上是兩個actor相互交換訊息,先用第一個來示范標準的actor構建程序:
object HelloActor {
sealed trait Request
case class Greeting(fromWhom: String, replyTo: ActorRef[Greeter.Greeted]) extends Request
def apply(): Behavior[Greeting] = {
Behaviors.receive { (ctx, greeter) =>
ctx.log.info("receive greeting from {}", greeter.fromWhom)
greeter.replyTo ! Greeter.Greeted(s"hello ${greeter.fromWhom}!")
Behaviors.same
}
}
}
akka-typed的actor構建是通過定義它的Behavior行為實作的,特別的是型別引數Behavior[Greeting],代表這個actor只處理Greeting型別的訊息,因而是個typed-actor,akka-typed已經不支持sender()了,在訊息里自帶,如Greeting.replyTo,Behavior定義是通過工廠模式Behaviors實作的,看看Behaviors的定義:
/**
* Factories for [[akka.actor.typed.Behavior]].
*/
object Behaviors {
def setup[T](factory: ActorContext[T] => Behavior[T]): Behavior[T]
def withStash[T](capacity: Int)(factory: StashBuffer[T] => Behavior[T]): Behavior[T]
def same[T]: Behavior[T]
def unhandled[T]: Behavior[T]
def stopped[T]: Behavior[T]
def stopped[T](postStop: () => Unit): Behavior[T]
def empty[T]: Behavior[T]
def ignore[T]: Behavior[T]
def receive[T](onMessage: (ActorContext[T], T) => Behavior[T]): Receive[T]
def receiveMessage[T](onMessage: T => Behavior[T]): Receive[T]
def receivePartial[T](onMessage: PartialFunction[(ActorContext[T], T), Behavior[T]]): Receive[T]
def receiveMessagePartial[T](onMessage: PartialFunction[T, Behavior[T]]): Receive[T]
def receiveSignal[T](handler: PartialFunction[(ActorContext[T], Signal), Behavior[T]]): Behavior[T]
def supervise[T](wrapped: Behavior[T]): Supervise[T]
def withTimers[T](factory: TimerScheduler[T] => Behavior[T]): Behavior[T]
...
}
上面的構建函式除回傳Behavior[T]外還有Receive[T]和Supervise[T],這兩個型別是什么?它們還是Behavior[T]:
trait Receive[T] extends Behavior[T] {
def receiveSignal(onSignal: PartialFunction[(ActorContext[T], Signal), Behavior[T]]): Behavior[T]
}
def supervise[T](wrapped: Behavior[T]): Supervise[T] =
new Supervise[T](wrapped)
private final val ThrowableClassTag = ClassTag(classOf[Throwable])
final class Supervise[T] private[akka] (val wrapped: Behavior[T]) extends AnyVal {
/** Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws. */
def onFailure[Thr <: Throwable: ClassTag](strategy: SupervisorStrategy): Behavior[T] = {
val tag = classTag[Thr]
val effectiveTag = if (tag == ClassTag.Nothing) ThrowableClassTag else tag
Supervisor(Behavior.validateAsInitial(wrapped), strategy)(effectiveTag)
}
}
注意,Supervise.onFailure回傳了Behavior[T],
helloActor的Behavior是通過Behaviors.receive構建的,還可以用setup,receiveMessage來構建,注意:構建函式的入引數也是Behavior[T],所以這些構造器可以一層層嵌套著使用,setup,receive為函式內層提供了ActorContext, withTimers提供TimerScheduler[T],那么我可以把HelloActor的功能再完善點,加個監管策略SupervisorStrategy:
object HelloActor {
sealed trait Request
case class Greeting(fromWhom: String, replyTo: ActorRef[Greeter.Greeted]) extends Request
def apply(): Behavior[Greeting] = {
Behaviors.supervise(
Behaviors.receive[Greeting] { (ctx, greeter) =>
ctx.log.info("receive greeting from {}", greeter.fromWhom)
greeter.replyTo ! Greeter.Greeted(s"hello ${greeter.fromWhom}!")
Behaviors.same
}
).onFailure(SupervisorStrategy.restartWithBackoff(10.seconds, 1.minute, 0.20))
}
}
在akka-typed里,actor監管已經從父輩轉到自身,再就是增加了BackOff-SupervisorStrategy,不需要獨立的BackOffSupervisor actor了,
再看看另一個Greeter:
object Greeter {
sealed trait Response
case class Greeted(hello: String) extends Response
def apply(): Behavior[Greeted] = {
Behaviors.setup ( ctx =>
Behaviors.receiveMessage { message =>
ctx.log.info(message.hello)
Behaviors.same
}
)
}
}
這個跟HelloActor沒什么不同,不過用了setup,receiveMessage套裝,值得注意的是Greeter負責處理Greeted訊息,這是一個不帶sender ActorRef的型別,意味著處理這類訊息后不需要答復訊息發送者,
然后還需要一個actor來構建上面兩個actor實體,啟動對話:
object GreetStarter {
sealed trait Command
case class SayHiTo(whom: String) extends Command
case class RepeatedGreeting(whom: String, interval: FiniteDuration) extends Command
def apply(): Behavior[Command] = {
Behaviors.setup[Command] { ctx =>
val helloActor = ctx.spawn(HelloActor(), "hello-actor")
val greeter = ctx.spawn(Greeter(), "greeter")
Behaviors.withTimers { timer =>
new GreetStarter(
helloActor,greeter,ctx,timer)
.repeatGreeting(1,3)
}
}
}
}
class GreetStarter private (
helloActor: ActorRef[HelloActor.Greeting],
greeter: ActorRef[Greeter.Greeted],
ctx: ActorContext[GreetStarter.Command],
timer: TimerScheduler[GreetStarter.Command]){
import GreetStarter._
private def repeatGreeting(count: Int, max: Int): Behavior[Command] =
Behaviors.receiveMessage { msg =>
msg match {
case RepeatedGreeting(whom, interval) =>
ctx.log.info2("start greeting to {} with interval {}", whom, interval)
timer.startSingleTimer(SayHiTo(whom), interval)
Behaviors.same
case SayHiTo(whom) =>
ctx.log.info2("{}th time greeting to {}",count,whom)
if (max == count)
Behaviors.stopped
else {
helloActor ! HelloActor.Greeting(whom, greeter)
repeatGreeting(count + 1, max)
}
}
}
}
上面這個例子有點復雜,邏輯也有些問題,主要是為了示范一種函式式actor構建模式及actor狀態轉換虛構出來的,akka-typed已經不再支持become方法了,
最后,需要一個相當于main這么一個頂層的程式:
def main(args: Array[String]) {
val man: ActorSystem[GreetStarter.Command] = ActorSystem(GreetStarter(), "greetDemo")
man ! GreetStarter.RepeatedGreeting("Tiger",5.seconds)
man ! GreetStarter.RepeatedGreeting("Peter",5.seconds)
man ! GreetStarter.RepeatedGreeting("Susanna",5.seconds)
}
akka-classic的頂級actor,即: /users是由系統默認創建的,akka-typed需要用戶提供這個頂層actor,這個是在ActorSystem的第一個引數指定的,我們再看看akka-typed的ActorSystem的構建函式:
object ActorSystem {
/**
* Scala API: Create an ActorSystem
*/
def apply[T](guardianBehavior: Behavior[T], name: String): ActorSystem[T] =
createInternal(name, guardianBehavior, Props.empty, ActorSystemSetup.create(BootstrapSetup()))
/**
* Scala API: Create an ActorSystem
*/
def apply[T](guardianBehavior: Behavior[T], name: String, config: Config): ActorSystem[T] =
createInternal(name, guardianBehavior, Props.empty, ActorSystemSetup.create(BootstrapSetup(config)))
/**
* Scala API: Create an ActorSystem
*/
def apply[T](guardianBehavior: Behavior[T], name: String, config: Config, guardianProps: Props): ActorSystem[T] =
createInternal(name, guardianBehavior, guardianProps, ActorSystemSetup.create(BootstrapSetup(config)))
...
}
其中一個apply與akka-classic的ActorSystem構建方式很相似:
def main(args: Array[String]) {
val config = ConfigFactory.load("application.conf")
val man: ActorSystem[GreetStarter.Command] = ActorSystem(GreetStarter(), "greetDemo",config)
man ! GreetStarter.RepeatedGreeting("Tiger",5.seconds)
man ! GreetStarter.RepeatedGreeting("Peter",5.seconds)
man ! GreetStarter.RepeatedGreeting("Susanna",5.seconds)
}
下面是本次討論的完整源代碼:
build.sbt
name := "learn-akka-typed" version := "0.1" scalaVersion := "2.13.2" lazy val akkaVersion = "2.6.5" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion, "ch.qos.logback" % "logback-classic" % "1.2.3" ) fork in Test := true
Lesson01.scala
import akka.actor.typed._ import scaladsl._ import scala.concurrent.duration._ import com.typesafe.config._ object Lesson01 { object HelloActor { sealed trait Request case class Greeting(fromWhom: String, replyTo: ActorRef[Greeter.Greeted]) extends Request def apply(): Behavior[Greeting] = { Behaviors.supervise( Behaviors.receive[Greeting] { (ctx, greeter) => ctx.log.info("receive greeting from {}", greeter.fromWhom) greeter.replyTo ! Greeter.Greeted(s"hello ${greeter.fromWhom}!") Behaviors.same } ).onFailure(SupervisorStrategy.restartWithBackoff(10.seconds, 1.minute, 0.20)) } } object Greeter { sealed trait Response case class Greeted(hello: String) extends Response def apply(): Behavior[Greeted] = { Behaviors.setup ( ctx => Behaviors.receiveMessage { message => ctx.log.info(message.hello) Behaviors.same } ) } } object GreetStarter { sealed trait Command case class SayHiTo(whom: String) extends Command case class RepeatedGreeting(whom: String, interval: FiniteDuration) extends Command def apply(): Behavior[Command] = { Behaviors.setup[Command] { ctx => val helloActor = ctx.spawn(HelloActor(), "hello-actor") val greeter = ctx.spawn(Greeter(), "greeter") Behaviors.withTimers { timer => new GreetStarter( helloActor,greeter,ctx,timer) .repeatGreeting(1,3) } } } } class GreetStarter private ( helloActor: ActorRef[HelloActor.Greeting], greeter: ActorRef[Greeter.Greeted], ctx: ActorContext[GreetStarter.Command], timer: TimerScheduler[GreetStarter.Command]){ import GreetStarter._ private def repeatGreeting(count: Int, max: Int): Behavior[Command] = Behaviors.receiveMessage { msg => msg match { case RepeatedGreeting(whom, interval) => ctx.log.info2("start greeting to {} with interval {}", whom, interval) timer.startSingleTimer(SayHiTo(whom), interval) Behaviors.same case SayHiTo(whom) => ctx.log.info2("{}th time greeting to {}",count,whom) if (max == count) Behaviors.stopped else { helloActor ! HelloActor.Greeting(whom, greeter) repeatGreeting(count + 1, max) } } } } def main(args: Array[String]) { val config = ConfigFactory.load("application.conf") val man: ActorSystem[GreetStarter.Command] = ActorSystem(GreetStarter(), "greetDemo",config) man ! GreetStarter.RepeatedGreeting("Tiger",5.seconds) man ! GreetStarter.RepeatedGreeting("Peter",5.seconds) man ! GreetStarter.RepeatedGreeting("Susanna",5.seconds) } }
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/413.html
標籤:Scala
