kafka-python的安裝、基本使用與zookeeper啟動等請參考:使用python連接kafka
自定義consumer讀取的offset寫法
注意在kafka-python中使用消費者自定義offset的讀取順序時,消費者的寫法:
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
tp = TopicPartition("python_test", 0) # 引數是[topic名稱,partition]
consumer.assign([tp]) # 這里是宣告我要手動管理這個consumer的這個partition啦
讀取資料的時候使用:
consumer.seek(tp, 3) # 這里可以手動設定偏移量,比如設定從第3個數開始讀
consumer.seek(tp, 50) # 從第50個數開始讀取
next(consumer)
ATTENTION:初始化consumer時,不能夠使用這種一次性把topic什么的都傳入進來的形式:
# 不能這么初始化 consumer
consumer = KafkaConsumer(self.topic, bootstrap_servers='localhost:9092', auto_offset_reset='latest', api_version=(0, 10, 2))
示例代碼
1. 啟動生產者,創建資料
首先啟動生產者:
from kafka import KafkaProducer
import datetime
import json
# 啟動生產者
producer = KafkaProducer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
my_topic = "python_test"
for i in range(100):
data = {'num': i, 'data': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
producer.send(my_topic, json.dumps(data).encode('utf-8')).get(timeout=30)
這里啟動一個python_test的topic,然后往里面寫入100個資料
2. 消費者自定義offset讀取
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import random
consumer = KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
tp = TopicPartition("python_test", 0) # 引數是[topic名稱,partition]
consumer.assign([tp]) # 這里是宣告我要手動管理這個consumer的這個partition啦
for i in range(10):
random_seek = random.randint(0, 100) # 這里是隨機生成從0-100的隨機整數,用于設定偏移量
consumer.seek(tp, random_seek) # 這里是設定偏移量
consumer_data = next(consumer) # 這個讀取consumer的內容,注意:使用next后,偏移量自動+1
print(consumer_data)
列印結果:
ConsumerRecord(topic='python_test', partition=0, offset=66, timestamp=1646473931338, timestamp_type=0, key=None, value=b'{"num": 61, "data": "2022-03-05 17:52:11"}', headers=[], checksum=1900770981, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=87, timestamp=1646473931355, timestamp_type=0, key=None, value=b'{"num": 82, "data": "2022-03-05 17:52:11"}', headers=[], checksum=3522714781, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=71, timestamp=1646473931342, timestamp_type=0, key=None, value=b'{"num": 66, "data": "2022-03-05 17:52:11"}', headers=[], checksum=1871660722, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=50, timestamp=1646473931318, timestamp_type=0, key=None, value=b'{"num": 45, "data": "2022-03-05 17:52:11"}', headers=[], checksum=4078097399, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=11, timestamp=1646473931271, timestamp_type=0, key=None, value=b'{"num": 6, "data": "2022-03-05 17:52:11"}', headers=[], checksum=2130074065, serialized_key_size=-1, serialized_value_size=41, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=71, timestamp=1646473931342, timestamp_type=0, key=None, value=b'{"num": 66, "data": "2022-03-05 17:52:11"}', headers=[], checksum=1871660722, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=83, timestamp=1646473931352, timestamp_type=0, key=None, value=b'{"num": 78, "data": "2022-03-05 17:52:11"}', headers=[], checksum=1428608549, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=25, timestamp=1646473931285, timestamp_type=0, key=None, value=b'{"num": 20, "data": "2022-03-05 17:52:11"}', headers=[], checksum=1811806078, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=36, timestamp=1646473931296, timestamp_type=0, key=None, value=b'{"num": 31, "data": "2022-03-05 17:52:11"}', headers=[], checksum=330696171, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=18, timestamp=1646473931278, timestamp_type=0, key=None, value=b'{"num": 13, "data": "2022-03-05 17:52:11"}', headers=[], checksum=719857123, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/438080.html
標籤:其他
上一篇:kafka啟動報錯:Failed to acquire lock on file .lock in /tmp/kafka-log
