alpakka-kafka-consumer的功能描述很簡單:向kafka訂閱某些topic然后把讀到的訊息傳給akka-streams做業務處理,在kafka-consumer的實作細節上,為了達到高可用、高吞吐的目的,topic又可用劃分出多個磁區partition,磁區是分布在kafka集群節點broker上的,由于一個topic可能有多個partition,對應topic就會有多個consumer,形成一個consumer組,共用統一的groupid,一個partition只能對應一個consumer、而一個consumer負責從多個partition甚至多個topic讀取訊息,kafka會根據實際情況將某個partition分配給某個consumer,即partition-assignment,所以一般來說我們會把topic訂閱與consumer-group掛鉤,這個可以在典型的ConsumerSettings證實:
val system = ActorSystem("kafka-sys")
val config = system.settings.config.getConfig("akka.kafka.consumer")
val bootstrapServers = "localhost:9092"
val consumerSettings =
ConsumerSettings(config, new StringDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
我們先用一個簡單的consumer plainSource試試把前一篇示范中producer寫入kafka的訊息讀出來:
import akka.actor.ActorSystem
import akka.kafka._
import akka.kafka.scaladsl._
import akka.stream.{RestartSettings, SystemMaterializer}
import akka.stream.scaladsl.{Keep, RestartSource, Sink}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.concurrent._
import scala.concurrent.duration._
object plain_source extends App {
val system = ActorSystem("kafka-sys")
val config = system.settings.config.getConfig("akka.kafka.consumer")
implicit val mat = SystemMaterializer(system).materializer
implicit val ec: ExecutionContext = mat.executionContext
val bootstrapServers = "localhost:9092"
val consumerSettings =
ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val subscription = Subscriptions.topics("greatings")
Consumer.plainSource(consumerSettings, subscription)
.runWith(Sink.foreach(msg => println(msg.value())))
scala.io.StdIn.readLine()
system.terminate()
}
以上我們沒有對讀出的訊息做任何的業務處理,直接顯示出來,注意每次都會從頭完整讀出,因為設定了 .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),也就是kafka-consumer的auto.offset.reset = "earliest" ,那么如果需要用讀出的資料進行業務處理的話,每次開始運行應用時都會重復從頭執行這些業務,所以需要某種機制來標注已經讀取的訊息,也就是需要記住當前讀取位置offset,
Consumer.plainSource輸入ConsumerRecord型別:
public ConsumerRecord(String topic,
int partition,
long offset,
K key,
V value) {
this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);
}
這個ConsumerRecord型別里包括了offset,用戶可以自行commit這個位置引數,也就是說用戶可以選擇把這個offset存盤在kafka或者其它的資料庫里,說到commit-offset,offset管理機制在kafka-consumer業務應用中應該屬于關鍵技術,kafka-consumer方面的業務流程可以簡述為:從kafka讀出業務指令,執行指令并更新業務狀態,然后再從kafka里讀出下一批指令,為了實作業務狀態的準確性,無論錯過一些指令或者重復執行一些指令都是不能容忍的,所以,必須準確的標記每次從kafka讀取資料后的指標位置,commit-offset,但是,如果讀出資料后即刻commit-offset,那么在執行業務指令時如果系統發生例外,那么下次再從標注的位置開始讀取資料時就會越過一批業務指令,這種情況稱為at-most-once,即可能會執行一次,但絕不會重復,另一方面:如果在成功改變業務狀態后再commit-offset,那么,一旦執行業務指令時發生例外而無法進行commit-offset,下次讀取的位置將使用前一次的標注位置,就會出現重復改變業務狀態的情況,這種情況稱為at-least-once,即一定會執行業務指令,但可能出現重復更新情況,如果涉及資金、庫存等業務,兩者皆不可接受,只能采用exactly-once保證一次這種模式了,不過也有很多業務要求沒那么嚴格,比如某個網站統計點擊量,只需個約莫數,無論at-least-once,at-most-once都可以接受,
kafka-consumer-offset是一個Long型別的值,可以存放在kafka內部或者外部的資料庫里,如果選擇在kafka內部存盤offset, kafka配置里可以設定按時間間隔自動進行位置標注,自動把當前offset存入kafka里,當我們在上面例子的ConsumerSettings里設定自動commit后,多次重新運行就不會出現重復資料的情況了:
val consumerSettings =
ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("group1")
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") //自動commit
.withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000") //自動commit間隔
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
alpakka-kafka提供了Committer型別,是akka-streams的Sink或Flow組件,負責把offset寫入kafka,如果用Committer的Sink或Flow就可以按用戶的需要控制commit-offset的發生時間,如下面這段示范代碼:
val committerSettings = CommitterSettings(system)
val control: DrainingControl[Done] =
Consumer
.committableSource(consumerSettings, Subscriptions.topics("greatings"))
.mapAsync(10) { msg =>
BusinessLogic.runBusiness(msg.record.key, msg.record.value)
.map(_ => msg.committableOffset)
}
.toMat(Committer.sink(committerSettings))(DrainingControl.apply)
.run()
control.drainAndShutdown();
scala.io.StdIn.readLine()
system.terminate()
}
object BusinessLogic {
def runBusiness(key: String, value: String): Future[Done] = Future.successful(Done)
}
上面這個例子里BusinessLogic.runBusiess()模擬一段業務處理代碼,也就是說完成了業務處理之后就用Committer.sink進行了commit-offset,這是一種at-least-once模式,因為runBusiness可能會發生例外失敗,所以有可能出現重復運算的情況,Consumer.committableSource輸出CommittableMessage:
def committableSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[CommittableMessage[K, V], Control] =
Source.fromGraph(new CommittableSource[K, V](settings, subscription))
final case class CommittableMessage[K, V](
record: ConsumerRecord[K, V],
committableOffset: CommittableOffset
)
@DoNotInherit sealed trait CommittableOffset extends Committable {
def partitionOffset: PartitionOffset
}
Committer.sink接受輸入Committable型別并將之寫入kafka,上游的CommittableOffset 繼承了 Committable,另外,這個DrainingControl型別結合了Control型別和akka-streams終結信號可以有效控制整個consumer-streams安全終結,
alpakka-kafka還有一個atMostOnceSource,這個Source組件每讀一條資料就會立即自動commit-offset:
def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
committableSource[K, V](settings, subscription).mapAsync(1) { m =>
m.committableOffset.commitInternal().map(_ => m.record)(ExecutionContexts.sameThreadExecutionContext)
}
可以看出來,atMostOnceSource在輸出ConsumerRecord之前就進行了commit-offset,atMostOnceSource的一個具體使用示范如下:
import scala.collection.immutable
val control: DrainingControl[immutable.Seq[Done]] =
Consumer
.atMostOnceSource(consumerSettings, Subscriptions.topics("greatings"))
.mapAsync(1)(record => BussinessLogic.runBusiness(record.key, record.value()))
.toMat(Sink.seq)(DrainingControl.apply)
.run()
control.drainAndShutdown();
scala.io.StdIn.readLine()
system.terminate()
所以,使用atMostOnceSource后是不需要任何Committer來進行commit-offset的了,值得注意的是atMostOnceSource是對每一條資料進行位置標注的,所以運行效率必然會受到影響,如果要求不是那么嚴格的話還是啟動自動commit比較合適,
對于任何型別的交易業務系統來說,無論at-least-once或at-most-once都是不可接受的,只有exactly-once才妥當,實作exactly-once的其中一個方法是把offset與業務資料存放在同一個外部資料庫中,如果在外部資料庫通過事務處理機制(transaction-processing)把業務狀態更新與commit-offset放在一個事務單元中同進同退就能實作exactly-once模式了,下面這段是官方檔案給出的一個示范:
val db = new mongoldb
val control = db.loadOffset().map { fromOffset =>
Consumer
.plainSource(
consumerSettings,
Subscriptions.assignmentWithOffset(
new TopicPartition(topic, /* partition = */ 0) -> fromOffset
)
)
.mapAsync(1)(db.businessLogicAndStoreOffset)
.toMat(Sink.seq)(DrainingControl.apply)
.run()
}
class mongoldb {
def businessLogicAndStoreOffset(record: ConsumerRecord[String, String]): Future[Done] = // ...
def loadOffset(): Future[Long] = // ...
}
在上面這段代碼里:db.loadOffset()從mongodb里取出上一次讀取位置,回傳Future[Long],然后用Subscriptions.assignmentWithOffset把這個offset放在一個tuple (TopicPartition,Long)里,TopicPartition定義如下:
public TopicPartition(String topic, int partition) {
this.partition = partition;
this.topic = topic;
}
這樣Consumer.plainSource就可以從offset開始讀取資料了,plainSource輸出ConsumerRecord型別:
public ConsumerRecord(String topic,
int partition,
long offset,
K key,
V value) {
this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);
}
這里面除業務指令value外還提供了當前offset,這些已經足夠在businessLogicAndStoreOffset()里運算一個單獨的business+offset事務了(transaction),
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/262371.html
標籤:Scala
上一篇:Python安裝教程
下一篇:Python的回圈結構,也簡單!
