alpakka專案是一個基于akka-streams流處理編程工具的scala/java開源專案,通過提供connector連接各種資料源并在akka-streams里進行資料處理,alpakka-kafka就是alpakka專案里的kafka-connector,對于我們來說:可以用alpakka-kafka來對接kafka,使用kafka提供的功能,或者從另外一個角度講:alpakka-kafka就是一個用akka-streams實作kafka功能的scala開發工具,
alpakka-kafka提供了kafka的核心功能:producer、consumer,分別負責把akka-streams里的資料寫入kafka及從kafka中讀出資料并輸入到akka-streams里,用akka-streams集成kafka的應用場景通常出現在業務集成方面:在一項業務A中產生一些業務操作指令寫入kafka,然后通過kafka把指令傳送給另一項業務B,業務B從kafka中獲取操作指令并進行相應的業務操作,如:有兩個業務模塊:識訓管理和庫存管理,一方面識訓管理向kafka寫入識訓記錄,另一頭庫存管理從kafka中讀取識訓記錄并更新相關庫存數量記錄,注意,這兩項業務是分別操作的,在alpakka中,實際的業務操作基本就是在akka-streams里的資料處理(transform),其實是典型的CQRS模式:讀寫兩方互不關聯,寫時不管受眾是誰,如何使用、讀者不關心誰是寫方,這里的寫和讀兩方分別代表kafka里的producer和consumer,
本篇我們先介紹alpakka-kafka的producer功能及其使用方法,如前所述:alpakka是用akka-streams實作了kafka-producer功能,alpakka提供的producer也就是akka-streams的一種組件,可以與其它的akka-streams組件組合形成更大的akka-streams個體,構建一個producer需要先完成幾個配件類構成:
1、producer-settings配置:alpakka-kafka在reference.conf里的akka.kafka.producer配置段落提供了足夠支持基本運作的默認producer配置,用戶可以通過typesafe config組態檔操作工具來靈活調整配置
2、de/serializer序列化工具:alpakka-kafka提供了String型別的序列化/反序列化函式,可以直接使用
4、bootstrap-server:一個以逗號分隔的kafka-cluster節點ip清單文本
下面是一個具體的例子:
implicit val system = ActorSystem("kafka_sys") val bootstrapServers = "localhost:9092" val config = system.settings.config.getConfig("akka.kafka.producer") val producerSettings = ProducerSettings(config, new StringSerializer, new StringSerializer) .withBootstrapServers(bootstrapServers)
這里使用ActorSystem只是為了讀取.conf檔案里的配置,還沒有使用任何akka-streams組件,akka.kafka.producer配置段落在alpakka-kafka的reference.conf里提供了默認配置,不需要在application.conf里重新定義,
alpakka-kafka提供了一個最基本的producer,非akka-streams組件,sendProducer,下面我們示范一下sendProducer的使用和效果:
import akka.actor.ActorSystem import akka.kafka.scaladsl.{Consumer, SendProducer} import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata} import akka.kafka._ import org.apache.kafka.common.serialization._ import scala.concurrent.duration._ import scala.concurrent.{Await, Future} object SendProducerDemo extends App { implicit val system = ActorSystem("kafka_sys") implicit val executionContext = system.dispatcher val bootstrapServers = "localhost:9092" val config = system.settings.config.getConfig("akka.kafka.producer") val producerSettings = ProducerSettings(config, new StringSerializer, new StringSerializer) .withBootstrapServers(bootstrapServers) val producer = SendProducer(producerSettings) val topic = "greatings" val lstfut: Seq[Future[RecordMetadata]] = (100 to 200).reverse .map(_.toString) .map(value => new ProducerRecord[String, String](topic, s"hello-$value")) .map(msg => producer.send(msg)) val futlst = Future.sequence(lstfut) Await.result(futlst, 2.seconds) scala.io.StdIn.readLine() producer.close() system.terminate() }
以上示范用sendProducer向kafka寫入100條hello訊息,使用的是集合遍歷,沒有使用akka-streams的Source,為了檢驗具體效果,我們可以使用kafka提供的一些手工指令,如下:
\w> ./kafka-topics --create --topic greatings --bootstrap-server localhost:9092 Created topic greatings. \w> ./kafka-console-consumer --topic greatings --bootstrap-server localhost:9092 hello-100 hello-101 hello-102 hello-103 hello-104 hello-105 hello-106 ...
既然producer代表寫入功能,那么在akka-streams里就是Sink或Flow組件的功能了,下面這個例子是producer Sink組件plainSink的示范:
import akka.Done import akka.actor.ActorSystem import akka.kafka.scaladsl._ import akka.kafka._ import akka.stream.scaladsl._ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization._ import scala.concurrent._ import scala.concurrent.duration._ object plain_sink extends App { implicit val system = ActorSystem("kafka_sys") val bootstrapServers = "localhost:9092" val config = system.settings.config.getConfig("akka.kafka.producer") val producerSettings = ProducerSettings(config, new StringSerializer, new StringSerializer) .withBootstrapServers(bootstrapServers) implicit val executionContext = system.dispatcher val topic = "greatings" val done: Future[Done] = Source(1 to 100) .map(_.toString) .map(value => new ProducerRecord[String, String](topic, s"hello-$value")) .runWith(Producer.plainSink(producerSettings)) Await.ready(done,3.seconds) scala.io.StdIn.readLine() system.terminate() }
這是一個典型的akka-streams應用實體,其中Producer.plainSink就是一個akka-streams Sink組件,
以上兩個示范都涉及到構建一個ProducerRecord型別并將之寫入kafka,ProducerRecord是一個基本的kafka訊息型別:
public ProducerRecord(String topic, K key, V value) { this(topic, null, null, key, value, null); }
topic是String型別,key, value 是 Any 型別的, alpakka-kafka在ProducerRecord之上又拓展了一個復雜點的訊息型別ProducerMessage.Envelope型別:
sealed trait Envelope[K, V, +PassThrough] {
def passThrough: PassThrough
def withPassThrough[PassThrough2](value: PassThrough2): Envelope[K, V, PassThrough2]
}
final case class Message[K, V, +PassThrough](
record: ProducerRecord[K, V],
passThrough: PassThrough
) extends Envelope[K, V, PassThrough] {
override def withPassThrough[PassThrough2](value: PassThrough2): Message[K, V, PassThrough2] =
copy(passThrough = value)
}
ProducerMessage.Envelope增加了個PassThrough引數,用來與訊息一道傳遞額外的元資料,alpakka-kafka streams組件使用這個訊息型別作為流元素,最終把它轉換成一或多條ProducerRecord寫入kafka,如下:
object EventMessages {
//一對一條ProducerRecord
def createMessage[KeyType,ValueType,PassThroughType](
topic: String,
key: KeyType,
value: ValueType,
passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = {
val single = ProducerMessage.single(
new ProducerRecord[KeyType,ValueType](topic,key,value),
passThrough
)
single
}
//一對多條ProducerRecord
def createMultiMessage[KeyType,ValueType,PassThroughType] (
topics: List[String],
key: KeyType,
value: ValueType,
passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = {
import scala.collection.immutable
val msgs = topics.map { topic =>
new ProducerRecord(topic,key,value)
}.toSeq
val multi = ProducerMessage.multi(
msgs,
passThrough
)
multi
}
//只傳遞通過型元資料
def createPassThroughMessage[KeyType,ValueType,PassThroughType](
topic: String,
key: KeyType,
value: ValueType,
passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = {
ProducerMessage.passThrough(passThrough)
}
}
flexiFlow是一個alpakka-kafka Flow組件,流入ProducerMessage.Evelope,流出Results型別:
def flexiFlow[K, V, PassThrough](
settings: ProducerSettings[K, V]
): Flow[Envelope[K, V, PassThrough], Results[K, V, PassThrough], NotUsed] = { ... }
Results型別定義如下:
final case class Result[K, V, PassThrough] private (
metadata: RecordMetadata,
message: Message[K, V, PassThrough]
) extends Results[K, V, PassThrough] {
def offset: Long = metadata.offset()
def passThrough: PassThrough = message.passThrough
}
也就是說flexiFlow可以回傳寫入kafka后kafka回傳的操作狀態資料,我們再看看flexiFlow的使用案例:
import akka.kafka.ProducerMessage._
import akka.actor.ActorSystem
import akka.kafka.scaladsl._
import akka.kafka.{ProducerMessage, ProducerSettings}
import akka.stream.scaladsl.{Sink, Source}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import scala.concurrent._
import scala.concurrent.duration._
object flexi_flow extends App {
implicit val system = ActorSystem("kafka_sys")
val bootstrapServers = "localhost:9092"
val config = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val topic = "greatings"
val done = Source(1 to 100)
.map { number =>
val value = https://www.cnblogs.com/tiger-xc/p/number.toString
EventMessages.createMessage(topic,"key",value,number)
}
.via(Producer.flexiFlow(producerSettings))
.map {
case ProducerMessage.Result(metadata, ProducerMessage.Message(record, passThrough)) =>
s"${metadata.topic}/${metadata.partition} ${metadata.offset}: ${record.value}"
case ProducerMessage.MultiResult(parts, passThrough) =>
parts
.map {
case MultiResultPart(metadata, record) =>
s"${metadata.topic}/${metadata.partition} ${metadata.offset}: ${record.value}"
}
.mkString(", ")
case ProducerMessage.PassThroughResult(passThrough) =>
s"passed through"
}
.runWith(Sink.foreach(println(_)))
Await.ready(done,3.seconds)
scala.io.StdIn.readLine()
system.terminate()
}
object EventMessages {
def createMessage[KeyType,ValueType,PassThroughType](
topic: String,
key: KeyType,
value: ValueType,
passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = {
val single = ProducerMessage.single(
new ProducerRecord[KeyType,ValueType](topic,key,value),
passThrough
)
single
}
def createMultiMessage[KeyType,ValueType,PassThroughType] (
topics: List[String],
key: KeyType,
value: ValueType,
passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = {
import scala.collection.immutable
val msgs = topics.map { topic =>
new ProducerRecord(topic,key,value)
}.toSeq
val multi = ProducerMessage.multi(
msgs,
passThrough
)
multi
}
def createPassThroughMessage[KeyType,ValueType,PassThroughType](
topic: String,
key: KeyType,
value: ValueType,
passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = {
ProducerMessage.passThrough(passThrough)
}
}
producer除向kafka寫入與業務相關的業務事件或業務指令外還會向kafka寫入當前訊息讀取的具體位置offset,所以alpakka-kafka的produce可分成兩種型別:上面示范的plainSink, flexiFlow只向kafka寫業務資料,還有一類如commitableSink還包括了把訊息讀取位置offset寫入commit的功能,如下:
val control =
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topic1, topic2))
.map { msg =>
ProducerMessage.single(
new ProducerRecord(targetTopic, msg.record.key, msg.record.value),
msg.committableOffset
)
}
.toMat(Producer.committableSink(producerSettings, committerSettings))(DrainingControl.apply)
.run()
control.drainAndShutdown()
如上所示,committableSource從kafka讀取業務訊息及讀取位置committableOffsset,然后Producer.committableSink把業務訊息和offset再寫入kafka,
下篇討論我們再具體介紹consumer,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/261626.html
標籤:Scala
