有一個開箱即用的 Kafka 服務器并具有以下腳本
#!/usr/bin/perl
use Net::Kafka::Producer;
use AnyEvent;
my $condvar = AnyEvent->condvar;
my $producer = Net::Kafka::Producer->new(
'bootstrap.servers' => 'localhost:9092'
);
for (my $index = 1;;$index ) {
my $msg = "message: " . $index;
$producer->produce(
payload => $msg,
topic => "tracked-coords"
)->then(sub {
my $delivery_report = shift;
$condvar->send;
print "Message successfully delivered with offset " . $delivery_report->{offset};
}, sub {
my $error = shift;
$condvar->send;
die "Unable to produce a message: " . $error->{error} . ", code: " . $error->{code};
});
}
為什么 Kafka 服務器在 10 萬條訊息處停止?
編輯
服務器停止報告它正在接收訊息。消費者也停止接收訊息
編輯
Kafka 服務器記錄這個(最后)
message: 99998
message: 99999
message: 100000
[2022-03-21 14:43:30,597] INFO [ProducerStateManager partition=tracked-coords-0] Wrote producer snapshot at offset 500000 with 0 producer ids in 15 ms. (kafka.log.ProducerStateManager)
[2022-03-21 14:43:30,598] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Rolled new log segment at offset 500000 in 18 ms. (kafka.log.Log)
[2022-03-21 14:43:30,599] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Deleting segment LogSegment(baseOffset=400000, size=2191596, lastModifiedTime=1647873685289, largestRecordTimestamp=Some(1647873685290)) due to retention time 2000ms breach based on the largest record timestamp in the segment (kafka.log.Log)
[2022-03-21 14:43:30,610] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Incremented log start offset to 500000 due to segment deletion (kafka.log.Log)
[2022-03-21 14:44:30,610] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Deleting segment files LogSegment(baseOffset=400000, size=2191596, lastModifiedTime=1647873685289, largestRecordTimestamp=Some(1647873685290)) (kafka.log.Log$)
[2022-03-21 14:44:30,612] INFO Deleted log /tmp/kafka-logs/tracked-coords-0/00000000000000400000.log.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,612] INFO Deleted offset index /tmp/kafka-logs/tracked-coords-0/00000000000000400000.index.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,612] INFO Deleted time index /tmp/kafka-logs/tracked-coords-0/00000000000000400000.timeindex.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,613] INFO Deleted producer state snapshot /tmp/kafka-logs/tracked-coords-0/00000000000000400000.snapshot.deleted (kafka.log.SnapshotFile)
這是消費者的代碼
#!/usr/bin/perl
use feature qw( say );
use Net::Kafka::Consumer;
use AnyEvent;
use Data::Dumper;
use JSON;
my $consumer = Net::Kafka::Consumer->new(
'bootstrap.servers' => 'localhost:9092',
'group.id' => 'mock_data',
'enable.auto.commit' => 'true',
);
$consumer->subscribe( [ "tracked-coords"] );
while (1) {
my $msg = $consumer->poll(1000);
if ($msg) {
$consumer->commit(); #_message(0, $msg);
say "====================================================================";
if ( $msg->err ) {
say "Error: ", Net::Kafka::Error::to_string($err);
} else {
say $msg->payload;
}
}
}
消費者停在100K
uj5u.com熱心網友回復:
由于您使用的是使用 librdkafka 庫的 Net::Kafka,因此可能是queue.buffering.max.messages設定。默認為 100,000。意義:
生產者佇列上允許的最大訊息數。此佇列由所有主題和磁區共享。請參閱:https ://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html
嘗試在您的 Net::Kafka::Producer->new() 呼叫中將其設定為較低的數字,以查看它是否會更快停止。該設定支持 1-10M 范圍。奇怪的是,我在 Kafka 服務器設定中沒有看到它,所以猜測它只是一個 edenhill 驅動程式設定。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/453020.html
上一篇:比較函式中未初始化的值在哪里?
