前置
1.有一個可運行的SpringBoot專案
2.已經安裝好zookeeper和kafka
沒有安裝好的請看我的上一篇博客安裝kafka 地址如下:
https://blog.csdn.net/zhanghengchao123/article/details/122149237
執行步驟
呼叫新增、編輯、洗掉之類的方法中,用kafka發送這個訊息給其他方法監聽
springboot專案集成kafka
-
匯入依賴
<!-- kafka異步通信--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- hutool工具包--> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.7.9</version> </dependency> -
配置yml檔案
kafka:
consumer:
zookeeper.connect: localhost:2181
servers: localhost:9092
enable.auto.commit: false
session.timeout: 60000
auto.commit.interval: 100
auto.offset.reset: latest
group.id: consumer-group
concurrency: 3
max_poll_interval_ms: 60000
max_poll_records: 20
producer:
servers: localhost:9092
retries: 3
batch.size: 16384
linger: 1
buffer.memory: 3355432
- 業務層方法呼叫kafka 發送訊息
// 先注入kafka 模板類
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
//kafka 不能直接發送物體類所以要 轉換成json格式的字串
JSON parse = JSONUtil.parse(你的物體類);
kafkaTemplate.send("topicdemo",parse.toString());

- 消費者消費(建一個監聽類如下)
@Slf4j
@Component
public class KafkaListenerDemo {
@KafkaListener(topics = "topicdemo",groupId = "consumer-group")
public void handle(ConsumerRecord<?, ?> record ){
Object value = record.value();
log.info("消費者進來了!"+record.toString());
DemoTeacher teacher=JSONUtil.toBean(value.toString(),DemoTeacher.class);
log.info("老師的值:--------"+teacher.toString());
}
}
- 執行你的springboot專案并呼叫業務方法輸出

如果幫到你麻煩點個贊
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/394191.html
標籤:其他
上一篇:HBaes shell輸入命令報錯:hbase ERROR: Can‘t get master address from ZooKeeper; znode data == null
下一篇:Elasticsearch(二)
