我正在嘗試在 ksqldb 之上構建一個應用程式。
假設我將有一個簡單的生產者:
package main
import (
"fmt"
"github.com/rmoff/ksqldb-go"
"net/http"
)
var client = ksqldb.NewClient("http://localhost:8088", "", "").Debug()
func init() {
offset := `SET 'auto.offset.reset' = 'earliest';`
if err := client.Execute(offset); err != nil {
panic(err)
}
s1 := `
CREATE OR REPLACE STREAM userEvents (
userId VARCHAR KEY,
eventType VARCHAR
)
WITH (
kafka_topic='user_events',
value_format='json',
partitions=8
);
`
if err := client.Execute(s1); err != nil {
panic(err)
}
}
func main() {
http.HandleFunc("/emit", hello)
http.ListenAndServe(":4201", nil)
}
func hello(w http.ResponseWriter, req *http.Request) {
userId := req.URL.Query().Get("userId")
if userId == "" {
http.Error(w, "no userId", 400)
return
}
userEvent := req.URL.Query().Get("event")
if userEvent == "" {
userEvent = "unknown"
}
err := client.Execute(fmt.Sprintf("INSERT INTO userEvents (userId, eventType) VALUES ('%s', '%s');",
userId, userEvent))
if err != nil {
http.Error(w, err.Error(), 500)
return
}
w.WriteHeader(200)
return
}
此應用程式創建一個資料流并公開一個端點以使用資料填充流。
另外,我有一個消費者:
package main
import (
"context"
"fmt"
"github.com/rmoff/ksqldb-go"
)
var client = ksqldb.NewClient("http://localhost:8088", "", "").Debug()
func main() {
query := `SET 'auto.offset.reset' = 'earliest';`
if err := client.Execute(query); err != nil {
panic(err)
}
ctx := context.TODO()
rows := make(chan ksqldb.Row)
headers := make(chan ksqldb.Header)
go func() {
if err := client.Push(ctx,
"SELECT * FROM userEvents EMIT CHANGES;",
rows,
headers); err != nil {
panic(err)
}
}()
h := <-headers
fmt.Printf("headers: [%v]", h)
for {
select {
case r := <-rows:
fmt.Printf("received event: [%v]", r)
}
}
}
我使用相同的查詢運行一個生產者和多個消費者。如何(并且有可能?)僅在一個消費者上接收事件?現在,通過這樣的設定,我在所有可用的消費者上接收到這些事件,但我想在一個單一的消費者上處理事件(處理會很長,所以我需要這個來實作并行性)。
老實說,我認為這是“標準”,所有連接的應用程式都屬于同一個組,而這種交付我是免費的。
本地集群配置(它是來自 Confluentic how-to-start 的標準配置):
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.0.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
ksqldb-server:
image: confluentinc/ksqldb-server:0.23.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.23.1
container_name: ksqldb-cli
depends_on:
- broker
- ksqldb-server
entrypoint: /bin/sh
tty: true
我的配置有問題還是我誤解了這個資料庫的用法?謝謝你的幫助!
uj5u.com熱心網友回復:
首先,請注意我不再維護該客戶端,您可能想查看https://github.com/thmeitz/ksqldb-go。
現在回答你的問題。如果我理解正確,您希望出于并行目的運行同一邏輯使用者的多個實體,因此每條訊息都應由該邏輯使用者處理一次。
如果是這種情況,那么您正在描述 Kafka 中所謂的消費者組。消費者的多個實體使用相同的客戶端 ID 標識自己,Kafka 確保來自源主題磁區的資料被路由到該組中的可用消費者。如果有四個消費者和八個磁區,每個消費者將從兩個磁區獲取資料。如果一個消費者離開了該組(它崩潰,您縮小規模等),那么 Kafka 會將該消費者的磁區重新分配給該組的剩余消費者。
這與您所看到的行為不同,您在其中有效地實體化了多個獨立的消費者。按照設計,Kafka 確保訂閱某個主題的每個消費者都能接收到該主題的所有訊息。
我這里特意說的是Kafka,而不是ksqlDB。這是因為 ksqlDB 是建立在 Kafka 之上的,為了理解您所看到的內容,解釋基礎知識很重要。
要獲得您正在尋找的行為,您可能希望直接在您的消費者應用程式中使用消費者 API。您可以在此 Golang 和 Kafka 快速入門中查看消費者 API 的示例。要創建一個消費者組,您需要指定一個唯一的group.id.
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/424129.html
