主頁 > 後端開發 > akka-typed(6) - cluster:group router, cluster-load-balancing

akka-typed(6) - cluster:group router, cluster-load-balancing

2020-09-10 01:56:12 後端開發

先談談akka-typed的router actor,route 分pool router, group router兩類,我們先看看pool-router的使用示范:

      val pool = Routers.pool(poolSize = 4)(
        // make sure the workers are restarted if they fail
        Behaviors.supervise(WorkerRoutee()).onFailure[Exception](SupervisorStrategy.restart))
      val router = ctx.spawn(pool, "worker-pool")

      (0 to 10).foreach { n =>
        router ! WorkerRoutee.DoLog(s"msg $n")
      }

上面例子里的pool是個pool-router,意思是一個有4個routees的routee池,每個routee都是通過WorkerRoutee()構建的,意味著routee池中只有一個種類的actor,pool-router是通過工廠方法直接在本地(JVM)構建(spawn)所有的routee,也就是說所有routee都是router的子actor,

再看看group-router的使用例子:

val serviceKey = ServiceKey[Worker.Command]("log-worker")

      // this would likely happen elsewhere - if we create it locally we
      // can just as well use a pool
      val workerRoutee = ctx.spawn(WorkerRoutee(), "worker-route")
      ctx.system.receptionist ! Receptionist.Register(serviceKey, workerRoutee)

      val group = Routers.group(serviceKey)
      val router = ctx.spawn(group, "worker-group")

      // the group router will stash messages until it sees the first listing of registered
      // services from the receptionist, so it is safe to send messages right away
      (0 to 10).foreach { n =>
        router ! WorkerRoutee.DoLog(s"msg $n")
      }

group-router與pool-router有較多分別:

1、routee是在router之外構建的,router是用一個key通過Receptionist獲取同key的actor清單作為routee group的

2、Receptionist是集群全域的,任何節點上的actor都可以發送注冊訊息在Receptionist上登記

3、沒有size限制,任何actor一旦在Receptionist上登記即變成routee,接受router管理

應該說如果想把運算任務分配在集群里的各節點上并行運算實作load-balance效果,group-router是最合適的選擇,不過對不同的運算任務需要多少routee則需要用戶自行決定,不像以前akka-classic里通過cluster-metrics根據節點負載情況自動增減routee實體那么方便,

Receptionist: 既然說到,那么就再深入一點介紹Receptionist的應用:上面提到,Receptionist是集群全域的,就是說任何節點上的actor都可以在Receptonist上注冊形成一個生存在集群中不同節點的actor清單,如果Receptionist把這個清單提供給一個用戶,那么這個用戶就可以把運算任務配置到各節點上,實作某種意義上的分布式運算模式,Receptionist的使用方式是:通過向本節點的Receptionist發送訊息去登記ActorRef,然后通過Receptionist發布的登記變化訊息即可獲取最新的ActorRef清單:

  val WorkerServiceKey = ServiceKey[Worker.TransformText]("Worker")

  ctx.system.receptionist ! Receptionist.Register(WorkerServiceKey, ctx.self)

  ctx.system.receptionist ! Receptionist.Subscribe(Worker.WorkerServiceKey, subscriptionAdapter)

Receptionist的登記和清單獲取是以ServiceKey作為關聯的,那么獲取的清單內應該全部是一種型別的actor,只不過它們的地址可能是跨節點的,但它們只能進行同一種運算,從另一個角度說,一項任務是分布在不同節點的actor并行進行運算的,

在上篇討論里提過:如果發布-訂閱機制是在兩個actor之間進行的,那么這兩個actor也需要在規定的資訊交流協議框架下作業:我們必須注意訊息型別,提供必要的訊息型別轉換機制,下面是一個Receptionist登記示范:

object Worker {

  val WorkerServiceKey = ServiceKey[Worker.TransformText]("Worker")

  sealed trait Command
  final case class TransformText(text: String, replyTo: ActorRef[TextTransformed]) extends Command with CborSerializable
  final case class TextTransformed(text: String) extends CborSerializable

  def apply(): Behavior[Command] =
    Behaviors.setup { ctx =>
      // each worker registers themselves with the receptionist
      ctx.log.info("Registering myself with receptionist")
      ctx.system.receptionist ! Receptionist.Register(WorkerServiceKey, ctx.self)

      Behaviors.receiveMessage {
        case TransformText(text, replyTo) =>
          replyTo ! TextTransformed(text.toUpperCase)
          Behaviors.same
      }
    }
}

Receptionist登記比較直接:登記者不需要Receptionist回傳訊息,所以隨便用ctx.self作為訊息的sender,注意TransformText的replyTo: ActorRef[TextTransformed],代表sender是個可以處理TextTransformed訊息型別的actor,實際上,在sender方是通過ctx.ask提供了TextTransformed的型別轉換,

Receptionist.Subscribe需要Receptionist回傳一個actor清單,所以是個request/response模式,那么發送給Receptionist訊息中的replyTo必須是發送者能處理的型別,如下:

  def apply(): Behavior[Event] = Behaviors.setup { ctx =>
    Behaviors.withTimers { timers =>
      // subscribe to available workers
      val subscriptionAdapter = ctx.messageAdapter[Receptionist.Listing] {
        case Worker.WorkerServiceKey.Listing(workers) =>
          WorkersUpdated(workers)
      }
      ctx.system.receptionist ! Receptionist.Subscribe(Worker.WorkerServiceKey, subscriptionAdapter)

...
    }
  

ctx.messageAdapter進行了一個從Receptionist.Listing回傳型別到WorkersUpdated型別的轉換機制登記:從Receptionist回復的List型別會被轉換成WorkersUpdated型別,如下:

...
   Behaviors.receiveMessage {
      case WorkersUpdated(newWorkers) =>
        ctx.log.info("List of services registered with the receptionist changed: {}", newWorkers)

...

另外,上面提過的TextTransformed轉換如下:

          ctx.ask[Worker.TransformText,Worker.TextTransformed](selectedWorker, Worker.TransformText(text, _)) {
            case Success(transformedText) => TransformCompleted(transformedText.text, text)
            case Failure(ex) => JobFailed("Processing timed out", text)
          }

ctx.ask將TextTransformed轉換成TransformCompleted,完整的Behavior定義如下:

object Frontend {

  sealed trait Event
  private case object Tick extends Event
  private final case class WorkersUpdated(newWorkers: Set[ActorRef[Worker.TransformText]]) extends Event
  private final case class TransformCompleted(originalText: String, transformedText: String) extends Event
  private final case class JobFailed(why: String, text: String) extends Event


  def apply(): Behavior[Event] = Behaviors.setup { ctx =>
    Behaviors.withTimers { timers =>
      // subscribe to available workers
      val subscriptionAdapter = ctx.messageAdapter[Receptionist.Listing] {
        case Worker.WorkerServiceKey.Listing(workers) =>
          WorkersUpdated(workers)
      }
      ctx.system.receptionist ! Receptionist.Subscribe(Worker.WorkerServiceKey, subscriptionAdapter)

      timers.startTimerWithFixedDelay(Tick, Tick, 2.seconds)

      running(ctx, IndexedSeq.empty, jobCounter = 0)
    }
  }

  private def running(ctx: ActorContext[Event], workers: IndexedSeq[ActorRef[Worker.TransformText]], jobCounter: Int): Behavior[Event] =
    Behaviors.receiveMessage {
      case WorkersUpdated(newWorkers) =>
        ctx.log.info("List of services registered with the receptionist changed: {}", newWorkers)
        running(ctx, newWorkers.toIndexedSeq, jobCounter)
      case Tick =>
        if (workers.isEmpty) {
          ctx.log.warn("Got tick request but no workers available, not sending any work")
          Behaviors.same
        } else {
          // how much time can pass before we consider a request failed
          implicit val timeout: Timeout = 5.seconds
          val selectedWorker = workers(jobCounter % workers.size)
          ctx.log.info("Sending work for processing to {}", selectedWorker)
          val text = s"hello-$jobCounter"
          ctx.ask[Worker.TransformText,Worker.TextTransformed](selectedWorker, Worker.TransformText(text, _)) {
            case Success(transformedText) => TransformCompleted(transformedText.text, text)
            case Failure(ex) => JobFailed("Processing timed out", text)
          }
          running(ctx, workers, jobCounter + 1)
        }
      case TransformCompleted(originalText, transformedText) =>
        ctx.log.info("Got completed transform of {}: {}", originalText, transformedText)
        Behaviors.same

      case JobFailed(why, text) =>
        ctx.log.warn("Transformation of text {} failed. Because: {}", text, why)
        Behaviors.same

    }

現在我們可以示范用group-router來實作某種跨節點的分布式運算,因為group-router是通過Receptionist來實作對routees管理的,而Receptionist是集群全域的,意味著如果我們在各節點上構建routee,然后向Receptionist登記,就會形成一個跨節點的routee ActorRef清單,如果把任務分配到這個清單上的routee上去運算,應該能實作集群節點負載均衡的效果,下面我們就示范這個loadbalancer,流程很簡單:在一個接入點 (serviceActor)中構建workersRouter,然后3個workerRoutee并向Receptionist登記,把接到的任務分解成子任務逐個發送給workersRouter,每個workerRoutee完成任務后將結果發送給一個聚合器Aggregator,Aggregator在核對完成接收所有workerRoutee回傳的結果后再把匯總結果回傳serverActor,先看看這個serverActor:

object Service {
  val routerServiceKey = ServiceKey[WorkerRoutee.Command]("workers-router")

  sealed trait Command extends CborSerializable

  case class ProcessText(text: String) extends Command {
    require(text.nonEmpty)
  }

  case class WrappedResult(res: Aggregator.Response) extends Command

  def serviceBehavior(workersRouter: ActorRef[WorkerRoutee.Command]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
    val aggregator = ctx.spawn(Aggregator(), "aggregator")
    val aggregatorRef: ActorRef[Aggregator.Response] = ctx.messageAdapter(WrappedResult)
    Behaviors.receiveMessage {
      case ProcessText(text) =>
        ctx.log.info("******************** received ProcessText command: {} ****************",text)
        val words = text.split(' ').toIndexedSeq
        aggregator ! Aggregator.CountText(words.size, aggregatorRef)
        words.foreach { word =>
          workersRouter ! WorkerRoutee.Count(word, aggregator)
        }
        Behaviors.same
      case WrappedResult(msg) =>
        msg match {
          case Aggregator.Result(res) =>
            ctx.log.info("************** mean length of words = {} **********", res)
        }
        Behaviors.same
    }
  }

  def singletonService(ctx: ActorContext[Command], workersRouter: ActorRef[WorkerRoutee.Command]) = {
    val singletonSettings = ClusterSingletonSettings(ctx.system)
      .withRole("front")
    SingletonActor(
      Behaviors.supervise(
        serviceBehavior(workersRouter)
      ).onFailure(
        SupervisorStrategy
          .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
          .withMaxRestarts(3)
          .withResetBackoffAfter(10.seconds)
      )
      , "singletonActor"
    ).withSettings(singletonSettings)
  }

  def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx =>
    val cluster = Cluster(ctx.system)
    val workersRouter = ctx.spawn(
      Routers.group(routerServiceKey)
        .withRoundRobinRouting(),
      "workersRouter"
    )
    (0 until 3).foreach { n =>
      val routee = ctx.spawn(WorkerRoutee(cluster.selfMember.address.toString), s"work-routee-$n")
      ctx.system.receptionist ! Receptionist.register(routerServiceKey, routee)
    }
    val singletonActor = ClusterSingleton(ctx.system).init(singletonService(ctx, workersRouter))
    Behaviors.receiveMessage {
      case job@ProcessText(text) =>
        singletonActor ! job
        Behaviors.same
    }
  }

}

整體goup-router和routee的構建是在apply()里,并把接到的任務轉發給singletonActor,singletonActor是以serviceBehavior為核心的一個actor,在servceBehavior里把收到的任務分解并分別發送給workersRouter,值得注意的是:serviceBehavior期望接收從Aggregator的回應,它們之間存在request/response模式資訊交流,所以需要Aggregator.Response到WrappedResult的型別轉換機制,還有:子任務是通過workersRoute發送給個workerRoutee的,我們需要各workerRoutee把運算結果返給給Aggregator,所以發送給workersRouter的訊息包含了Aggregator的ActorRef,如:workersRouter ! WorkerRoutee.Count(cnt,aggregatorRef),

Aggregator是個persistentActor, 如下:

 

object Aggregator {
  sealed trait Command
  sealed trait Event extends  CborSerializable
  sealed trait Response

  case class CountText(cnt: Int, replyTo: ActorRef[Response]) extends Command
  case class MarkLength(word: String, len: Int) extends Command
  case class TextCounted(cnt: Int) extends Event
  case class LengthMarked(word: String, len: Int) extends Event
  case class Result(meanWordLength: Double) extends Response

  case class State(expectedNum: Int = 0, lens: List[Int] = Nil)

  var replyTo: ActorRef[Response] = _

  def commandHandler: (State,Command) => Effect[Event,State] = (st,cmd) => {
    cmd match {
      case CountText(cnt,ref) =>
        replyTo = ref
        Effect.persist(TextCounted(cnt))
      case MarkLength(word,len) =>
        Effect.persist(LengthMarked(word,len))
    }
  }
  def eventHandler: (State, Event) => State = (st,ev) => {
    ev match {
      case TextCounted(cnt) =>
        st.copy(expectedNum = cnt, lens = Nil)
      case LengthMarked(word,len) =>
        val state = st.copy(lens = len :: st.lens)
        if (state.lens.size >= state.expectedNum) {
          val meanWordLength = state.lens.sum.toDouble / state.lens.size
          replyTo ! Result(meanWordLength)
          State()
        } else state
    }
  }
  val takeSnapShot: (State,Event,Long) => Boolean = (st,ev,seq) => {
      if (st.lens.isEmpty) {
          if (ev.isInstanceOf[LengthMarked])
            true
          else
            false
      } else
         false
  }
  def apply(): Behavior[Command] = Behaviors.supervise(
    Behaviors.setup[Command] { ctx =>
      EventSourcedBehavior(
        persistenceId = PersistenceId("33","2333"),
        emptyState = State(),
        commandHandler = commandHandler,
        eventHandler = eventHandler
      ).onPersistFailure(
        SupervisorStrategy
          .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
          .withMaxRestarts(3)
          .withResetBackoffAfter(10.seconds)
      ).receiveSignal {
        case (state, RecoveryCompleted) =>
          ctx.log.info("**************Recovery Completed with state: {}***************",state)
        case (state, SnapshotCompleted(meta))  =>
          ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr)
        case (state,RecoveryFailed(err)) =>
          ctx.log.error("*************recovery failed with: {}***************",err.getMessage)
        case (state,SnapshotFailed(meta,err)) =>
          ctx.log.error("***************snapshoting failed with: {}*************",err.getMessage)
      }.snapshotWhen(takeSnapShot)
    }
  ).onFailure(
    SupervisorStrategy
      .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
      .withMaxRestarts(3)
      .withResetBackoffAfter(10.seconds)
  )
}

注意這個takeSnapShot函式:這個函式是在EventSourcedBehavior.snapshotWhen(takeSnapShot)呼叫的,傳入引數是(State,Event,seqenceNr),我們需要對State,Event的當前值進行分析后回傳true代表做一次snapshot,

看看一部分顯示就知道任務已經分配到幾個節點上的routee:

20:06:59.072 [ClusterSystem-akka.actor.default-dispatcher-15] INFO com.learn.akka.WorkerRoutee$ - ************** processing [this] on akka://[email protected]:51182 ***********
20:06:59.072 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.WorkerRoutee$ - ************** processing [text] on akka://[email protected]:51182 ***********
20:06:59.072 [ClusterSystem-akka.actor.default-dispatcher-36] INFO com.learn.akka.WorkerRoutee$ - ************** processing [be] on akka://[email protected]:51182 ***********
20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.learn.akka.WorkerRoutee$ - ************** processing [will] on akka://[email protected]:51173 ***********
20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-26] INFO com.learn.akka.WorkerRoutee$ - ************** processing [is] on akka://[email protected]:25251 ***********
20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-13] INFO com.learn.akka.WorkerRoutee$ - ************** processing [the] on akka://[email protected]:51173 ***********
20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.WorkerRoutee$ - ************** processing [that] on akka://[email protected]:25251 ***********
20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.WorkerRoutee$ - ************** processing [analyzed] on akka://[email protected]:25251 ***********

這個例子的源代碼如下:

package com.learn.akka

import akka.actor.typed._
import akka.persistence.typed._
import akka.persistence.typed.scaladsl._
import scala.concurrent.duration._
import akka.actor.typed.receptionist._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl._
import akka.cluster.typed.Cluster
import akka.cluster.typed.ClusterSingleton
import akka.cluster.typed.ClusterSingletonSettings
import akka.cluster.typed.SingletonActor
import com.typesafe.config.ConfigFactory

object WorkerRoutee {
  sealed trait Command extends CborSerializable
  case class Count(word: String, replyTo: ActorRef[Aggregator.Command]) extends Command

  def apply(nodeAddress: String): Behavior[Command] = Behaviors.setup {ctx =>
    Behaviors.receiveMessage[Command] {
      case Count(word,replyTo) =>
        ctx.log.info("************** processing [{}] on {} ***********",word,nodeAddress)
        replyTo ! Aggregator.MarkLength(word,word.length)
        Behaviors.same
    }
  }
}
object Aggregator {
  sealed trait Command
  sealed trait Event extends  CborSerializable
  sealed trait Response

  case class CountText(cnt: Int, replyTo: ActorRef[Response]) extends Command
  case class MarkLength(word: String, len: Int) extends Command
  case class TextCounted(cnt: Int) extends Event
  case class LengthMarked(word: String, len: Int) extends Event
  case class Result(meanWordLength: Double) extends Response

  case class State(expectedNum: Int = 0, lens: List[Int] = Nil)

  var replyTo: ActorRef[Response] = _

  def commandHandler: (State,Command) => Effect[Event,State] = (st,cmd) => {
    cmd match {
      case CountText(cnt,ref) =>
        replyTo = ref
        Effect.persist(TextCounted(cnt))
      case MarkLength(word,len) =>
        Effect.persist(LengthMarked(word,len))
    }
  }
  def eventHandler: (State, Event) => State = (st,ev) => {
    ev match {
      case TextCounted(cnt) =>
        st.copy(expectedNum = cnt, lens = Nil)
      case LengthMarked(word,len) =>
        val state = st.copy(lens = len :: st.lens)
        if (state.lens.size >= state.expectedNum) {
          val meanWordLength = state.lens.sum.toDouble / state.lens.size
          replyTo ! Result(meanWordLength)
          State()
        } else state
    }
  }
  val takeSnapShot: (State,Event,Long) => Boolean = (st,ev,seq) => {
      if (st.lens.isEmpty) {
          if (ev.isInstanceOf[LengthMarked])
            true
          else
            false
      } else
         false
  }
  def apply(): Behavior[Command] = Behaviors.supervise(
    Behaviors.setup[Command] { ctx =>
      EventSourcedBehavior(
        persistenceId = PersistenceId("33","2333"),
        emptyState = State(),
        commandHandler = commandHandler,
        eventHandler = eventHandler
      ).onPersistFailure(
        SupervisorStrategy
          .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
          .withMaxRestarts(3)
          .withResetBackoffAfter(10.seconds)
      ).receiveSignal {
        case (state, RecoveryCompleted) =>
          ctx.log.info("**************Recovery Completed with state: {}***************",state)
        case (state, SnapshotCompleted(meta))  =>
          ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr)
        case (state,RecoveryFailed(err)) =>
          ctx.log.error("*************recovery failed with: {}***************",err.getMessage)
        case (state,SnapshotFailed(meta,err)) =>
          ctx.log.error("***************snapshoting failed with: {}*************",err.getMessage)
      }.snapshotWhen(takeSnapShot)
    }
  ).onFailure(
    SupervisorStrategy
      .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
      .withMaxRestarts(3)
      .withResetBackoffAfter(10.seconds)
  )
}
object Service {
  val routerServiceKey = ServiceKey[WorkerRoutee.Command]("workers-router")

  sealed trait Command extends CborSerializable

  case class ProcessText(text: String) extends Command {
    require(text.nonEmpty)
  }

  case class WrappedResult(res: Aggregator.Response) extends Command

  def serviceBehavior(workersRouter: ActorRef[WorkerRoutee.Command]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
    val aggregator = ctx.spawn(Aggregator(), "aggregator")
    val aggregatorRef: ActorRef[Aggregator.Response] = ctx.messageAdapter(WrappedResult)
    Behaviors.receiveMessage {
      case ProcessText(text) =>
        ctx.log.info("******************** received ProcessText command: {} ****************",text)
        val words = text.split(' ').toIndexedSeq
        aggregator ! Aggregator.CountText(words.size, aggregatorRef)
        words.foreach { word =>
          workersRouter ! WorkerRoutee.Count(word, aggregator)
        }
        Behaviors.same
      case WrappedResult(msg) =>
        msg match {
          case Aggregator.Result(res) =>
            ctx.log.info("************** mean length of words = {} **********", res)
        }
        Behaviors.same
    }
  }

  def singletonService(ctx: ActorContext[Command], workersRouter: ActorRef[WorkerRoutee.Command]) = {
    val singletonSettings = ClusterSingletonSettings(ctx.system)
      .withRole("front")
    SingletonActor(
      Behaviors.supervise(
        serviceBehavior(workersRouter)
      ).onFailure(
        SupervisorStrategy
          .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
          .withMaxRestarts(3)
          .withResetBackoffAfter(10.seconds)
      )
      , "singletonActor"
    ).withSettings(singletonSettings)
  }

  def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx =>
    val cluster = Cluster(ctx.system)
    val workersRouter = ctx.spawn(
      Routers.group(routerServiceKey)
        .withRoundRobinRouting(),
      "workersRouter"
    )
    (0 until 3).foreach { n =>
      val routee = ctx.spawn(WorkerRoutee(cluster.selfMember.address.toString), s"work-routee-$n")
      ctx.system.receptionist ! Receptionist.register(routerServiceKey, routee)
    }
    val singletonActor = ClusterSingleton(ctx.system).init(singletonService(ctx, workersRouter))
    Behaviors.receiveMessage {
      case job@ProcessText(text) =>
        singletonActor ! job
        Behaviors.same
    }
  }

}

object LoadBalance {
  def main(args: Array[String]): Unit = {
    if (args.isEmpty) {
      startup("compute", 25251)
      startup("compute", 25252)
      startup("compute", 25253)
      startup("front", 25254)
    } else {
      require(args.size == 2, "Usage: role port")
      startup(args(0), args(1).toInt)
    }
  }

  def startup(role: String, port: Int): Unit = {
    // Override the configuration of the port when specified as program argument
    val config = ConfigFactory
      .parseString(s"""
      akka.remote.artery.canonical.port=$port
      akka.cluster.roles = [$role]
      """)
      .withFallback(ConfigFactory.load("cluster-persistence"))

    val frontEnd = ActorSystem[Service.Command](Service(), "ClusterSystem", config)
    if (role == "front") {
      println("*************** sending ProcessText command  ************")
      frontEnd ! Service.ProcessText("this is the text that will be analyzed")
    }

  }

}

cluster-persistence.conf

akka.actor.allow-java-serialization = on
akka {
  loglevel = INFO
  actor {
    provider = cluster
    serialization-bindings {
      "com.learn.akka.CborSerializable" = jackson-cbor
    }
  }
 remote {
    artery {
      canonical.hostname = "127.0.0.1"
      canonical.port = 0
    }
  }
  cluster {
    seed-nodes = [
      "akka://[email protected]:25251",
      "akka://[email protected]:25252"]
  }
  # use Cassandra to store both snapshots and the events of the persistent actors
  persistence {
    journal.plugin = "akka.persistence.cassandra.journal"
    snapshot-store.plugin = "akka.persistence.cassandra.snapshot"
  }
}
akka.persistence.cassandra {
  # don't use autocreate in production
  journal.keyspace = "poc"
  journal.keyspace-autocreate = on
  journal.tables-autocreate = on
  snapshot.keyspace = "poc_snapshot"
  snapshot.keyspace-autocreate = on
  snapshot.tables-autocreate = on
}

datastax-java-driver {
  basic.contact-points = ["192.168.11.189:9042"]
  basic.load-balancing-policy.local-datacenter = "datacenter1"
}

build.sbt

name := "learn-akka-typed"

version := "0.1"

scalaVersion := "2.13.1"
scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint")
javacOptions in Compile ++= Seq("-Xlint:unchecked", "-Xlint:deprecation")

val AkkaVersion = "2.6.5"
val AkkaPersistenceCassandraVersion = "1.0.0"


libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
  "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
  "com.typesafe.akka" %% "akka-persistence-cassandra" % AkkaPersistenceCassandraVersion,
  "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
  "ch.qos.logback"     % "logback-classic"             % "1.2.3"
)

 

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/420.html

標籤:Scala

上一篇:kka-typed(5) - cluster:集群節點狀態監視

下一篇:akka-typed(7) - cluster:sharding, 集群分片

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more