在前面的的討論里已經介紹了CQRS讀寫分離模式的一些原理和在akka-typed應用中的實作方式,通過一段時間akka-typed的具體使用對一些經典akka應用的遷移升級,感覺最深的是EvenSourcedBehavior和akka-cluster-sharding了,前者是經典akka中persistenceActor的替換,后者是在原有組件基礎上在使用方面的升級版,兩者都在使用便捷性方面提供了大幅度的提升,在我看來,cluster-sharding是分布式應用的核心,如果能夠比較容易掌握,對開發正確的分布式系統有著莫大的裨益,但這篇討論的重點將會集中在EventSourcedBehavior上,因為它是實作CQRS的關鍵,而CQRS又是大資料應用資料采集(輸入)管理最新的一個重要模式,
EventSourcedBehaviro是akka-typed為event-sourcing事件源模式提供的開發支持,具體的原理和使用方法在前面的博客里都介紹過了,在這篇就不再重復,我們把時間精力放到對event-sourcing的了解和應用上,
可以說,event-sourcing是一種資料庫操作的模式,簡單來說:event-sourcing的作業原理是把對資料庫的操作動作保存起來,不直接對資料庫進行即時更新,而是在一個階段之后通過回溯replay這些動作才對資料庫進行實質的更新,event-sourcing與傳統資料庫操作模式的最大分別就是:event-sourcing對資料庫的更新程序可以重復,在一個既定的原點開始重演所有動作可以得出同樣的結果,即同樣的資料庫狀態,在大資料、高并發應用中最難控制的應該就是用戶操作了,用戶可能在任何時間同時對同一項資料進行更新,通用的傳統方式是通過“鎖”來保證資料的正確性,但“鎖”會給系統帶來更多的麻煩如回應慢甚至系統鎖死,而一旦出現系統鎖死重啟后并無有效辦法恢復資料庫正確狀態,event-sourcing恰恰就能有針對性的解決這些問題,
感覺到,event-sourcing模式應該可以避免對“鎖”的使用:在高并發環境里,event-sourcing系統的每個用戶在任何時間都有可能對資料庫進行操作,但他們并不直接改變資料庫內容,而是將這些對資料庫操作的動作保存起來,因為用戶保存的是各自的動作,互不關聯,所以不需要任何鎖機制,當系統完成一個階段的作業后,從這個階段的起點開始,把所有用戶的動作按發生時間順序重演并對資料庫進行實質的更新,可以看到,這個具體的資料庫更新程序是單一用戶的,所以不需要“鎖”了,階段的起點是由資料庫狀態快照來表示,在完成了這個階段所有動作重演后資料庫狀態一次性更新,整個程序即是CQRS讀寫分離模式了,其中:保存動作為寫部分,動作重演是讀部分,動作重演可以在之后的任何時間進行,因而讀、寫是完全分離的,實際上CQRS就是一個資料庫更新管理的狀態機器:從資料起始狀態到終結狀態的一種程序管理方法,下面就用一個實際的應用設計例子來介紹CQRS在應用系統中的具體使用,
下面討論一個超市收款機pos軟體的例子:
收款流程比較簡單:收款員登錄=>掃碼錄入銷售專案=>錄入折扣=>其它操作=>支付=>打小票
最終結果是在資料庫產生了一張銷售單,即一組交易資料,是實際反映在交易資料庫里的,從CQRS流程來解釋:這組銷售資料在開單時為空,然后在完成所有單據操作后一次性產生,也就是在CQRS模式的讀部分產生的,在這個程序中一直是寫部分的操作,不影響交易資料庫狀態,當然,我們還必須在記憶體里維護一個模擬的狀態來對每項操作進行控制,如:用戶未登錄時不容許任何操作動作,所以必須有個狀態能代表用戶登錄的,而這個狀態應該可以通過動作重演來重現,所以用戶登錄也是一個必須保存的動作,如此,每張銷售單在記憶體里都應該有一個狀態,這個狀態包括了單據狀態和一個動態的交易專案集合,這個專案集合就代表即將產生的資料庫交易資料,下面是單據狀態的定義:
case class VchStates(
opr: String = "", //收款員
num: Int = 1, //當前單號
seq: Int = 1, //當前序號
void: Boolean = false, //取消模式
refd: Boolean = false, //退款模式
susp: Boolean = false, //掛單
canc: Boolean = false, //廢單
due: Boolean = false, //當前余額
su: String = "",
mbr: String = "",
disc: Int = 0, //預設折扣,如:會員折扣
mode: Int = 0 //當前操作流程:0=logOff, 1=LogOn, 2=Payment
) extends CborSerializable { ... }
交易專案是交易資料的直接對應:
case class TxnItem(
txndate: String = LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd"))
, txntime: String = LocalDateTime.now.format(dateTimeFormatter).substring(11)
, opr: String = "" //工號
, num: Int = 0 //銷售單號
, seq: Int = 1 //交易序號
, txntype: Int = TXNTYPE.sales //交易型別
, salestype: Int = SALESTYPE.nul //銷售型別
, qty: Int = 1 //交易數量
, price: Int = 0 //單價(分)
, amount: Int = 0 //碼洋(分)
, disc: Int = 0 //折扣率 (%) 100% = 1
, dscamt: Int = 0 //折扣額:負值 net實洋 = amount + dscamt
, member: String = "" //會員卡號
, code: String = "" //編號(商品、部類編號、賬戶編號、卡號...)
, refnum: String = "" //參考號,如退貨單號
, acct: String = "" //賬號
, dpt: String = "" //部類
) extends CborSerializable {
為了提高系統效率,根據操作動作實時對交易專案進行了更新,如遇到折扣動作時需要更新上一條交易專案的優惠金額等,這也是在讀部分動作重演必須的,因為CQRS的讀部分目的是把正確的交易資料寫到資料庫里,所以,CQRS的寫部分就代表對記憶體中這個交易專案集的動態更新程序,
單據狀態在結單時用EventSourcedBehavior拿了個snapshot作為下一單的起始狀態,銷售中途出現例外退出后可以在上一單狀態快照的基礎上實施動作重演把狀態恢復到出現例外之前,
由于每個階段都可以清晰的用一張銷售單的生命周期來代表,所以在整單操作完成后就可以進行CQRS的讀部分了,操作結束的方式最明顯的是單據完成支付操作了,如下:
case PaymentMade(acct, dpt, num, ref,amount) =>
if (curItem.txntype != TXNTYPE.voided) {
val due = items.totalSales - items.totalPaid
val bal = if (items.totalSales > 0) due - curItem.amount else due + curItem.amount
log.step(s"#${vchState.num} PaymentMade with input totalSales[${items.totalSales}], totalPaid[${items.totalPaid}], txnItems[${items}].")
val vchs = vchState.copy(
seq = vchState.seq + 1,
due = (if ((items.totalPaid.abs + curItem.amount.abs) >= items.totalSales.abs) false else true),
mode = (if (items.totalPaid.abs > 0) 2 else 1)
)
val vItems = items.addItem(curItem.copy(
salestype = SALESTYPE.ttl,
price = due,
amount = curItem.amount,
dscamt = bal
)).txnitems
if (replay) {
Voucher(vchs, vItems)
} else {
if (vchs.due) {
val vch = Voucher(vchs,vItems)
log.step(s"#${vchState.num} PaymentMade with current item: ${vch.items.head}")
vch
}
else {
writerInternal.lastVoucher = Voucher(vchs, vItems)
if (!writerInternal.afterRecovery)
endVoucher(Voucher(vchs,vItems),TXNTYPE.sales)
Voucher(vchs.nextVoucher, List())
}
}
}
else {
log.step(s"#${vchState.num} PaymentMade with current item: $curItem")
Voucher(vchState.copy(
seq = vchState.seq + 1)
, items.addItem(curItem).txnitems)
}
確認了完成支付呼叫endVoucher. endVoucher啟動讀部分reader, 如下:
def endVoucher(voucher: Voucher, txntype: Int)(implicit writerInternal: WriterInternal,pid:Messages.PID) = {
log.step(s"#${writerInternal.lastVoucher.header.num } ending voucher with state: ${writerInternal.lastVoucher.header}, txns: ${writerInternal.lastVoucher.items}")
val readerShard = writerInternal.optSharding.get //ClusterSharding(writerInternal.actorContext.system)
val readerRef = readerShard.entityRefFor(POSReader.EntityKey, s"$pid.shopId:$pid.posId")
val eseq = EventSourcedBehavior.lastSequenceNumber(writerInternal.optContext.get)
val bseq = eseq - writerInternal.listOfActions.size + 1
log.step(s"#${writerInternal.lastVoucher.header.num } sending PerformRead(${pid.shopid}, ${pid.posid},${writerInternal.lastVoucher.header.num},${writerInternal.lastVoucher.header.opr},$bseq,$eseq,$txntype,${writerInternal.expurl},${writerInternal.expacct},${writerInternal.exppass}) ...")
// log.step(s"#${writerInternal.lastVoucher.header.num } ending voucher with actions: ${writerInternal.listOfActions}")
readerRef ! Messages.PerformRead(pid.shopid, pid.posid,writerInternal.lastVoucher.header.num,writerInternal.lastVoucher.header.opr,bseq,eseq,txntype,writerInternal.expurl,writerInternal.expacct,writerInternal.exppass)
writerInternal.clearListOfAction()
log.step(s"#${writerInternal.lastVoucher.header.num } ending voucher with actions: ${writerInternal.listOfActions}")
}
reader是在一個sharding上即時構建的一個actor,這個actor的主要功能就是從journal里讀出這張單所有動作進行重演得出交易專案集后寫進交易資料庫:
def readActions(ctx: ActorContext[Command],vchnum: Int, cshr: String, startSeq: Long, endSeq: Long, trace: Boolean, nodeAddress: String, shopId: String, posId: String, txntype: Int): Future[List[TxnItem]] = {
implicit val classicSystem = ctx.system.toClassic
implicit val ec = classicSystem.dispatcher
implicit var vchState = VchStates().copy(num = vchnum, opr = cshr)
implicit var vchItems = VchItems()
implicit var curTxnItem = TxnItem()
implicit val pid = PID(shopId,posId)
implicit val writerInternal = new Messages.WriterInternal(nodeAddress = nodeAddress, pid = pid, trace=trace)
log.stepOn = trace
log.step(s"POSReader: readActions($vchnum,$cshr,$startSeq,$endSeq,$trace,$nodeAddress,$shopId,$posId), txntype=$txntype")
def buildVoucher(actions: List[Any]): List[TxnItem] = {
log.step(s"POSReader: read actions: $actions")
val (voidtxns,onlytxns) = actions.asInstanceOf[Seq[Action]].pickOut(_.isInstanceOf[Voided])
val listOfActions = actions.reverse zip (LazyList from 1) //zipWithIndex
listOfActions.foreach { case (txn,idx) =>
txn.asInstanceOf[Action] match {
case ti@_ =>
curTxnItem = EventHandlers.buildTxnItem(ti.asInstanceOf[Action],vchState).copy(opr=cshr)
if (!ti.isInstanceOf[Voided]) {
if (voidtxns.exists(a => a.asInstanceOf[Voided].seq == idx)) {
curTxnItem = curTxnItem.copy(txntype = TXNTYPE.voided, opr = cshr)
log.step(s"POSReader: voided txnitem: $curTxnItem")
}
}
val vch = EventHandlers.updateState(ti.asInstanceOf[Action],vchState,vchItems,curTxnItem,true)
vchState = vch.header
vchItems = vch.txnItems
log.step(s"POSReader: built txnitem: ${vchItems.txnitems.head}")
}
}
log.step(s"POSReader: voucher built with state: $vchState, items: ${vchItems.txnitems}")
vchItems.txnitems
}
val query =
PersistenceQuery(classicSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
implicit val session = CassandraSessionRegistry(classicSystem).sessionFor("alpakka.cassandra")
// issue query to journal
val source: Source[EventEnvelope, NotUsed] =
query.currentEventsByPersistenceId(s"${pid.shopid}:${pid.posid}", startSeq, endSeq)
// materialize stream, consuming events
val readActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny }
for {
lst1 <- readActions //read list from Source
lstTxns <- if (lst1.length < (endSeq -startSeq)) //if imcomplete list read again
readActions
else FastFuture.successful(lst1)
items <- FastFuture.successful( buildVoucher(lstTxns) )
_ <- JournalTxns.writeTxnsToDB(vchnum,txntype,startSeq,endSeq,items)
_ <- session.close(ec)
} yield items
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/424.html
標籤:Scala
上一篇:Scala 中的約定
