現在專案中kafka環境測驗結果如下:
比如一次拉500條資料下來處理,
當處理第100條時由于資料問題導致程式報錯了,那么101-500這些資料就沒被消費了
下一批資料就會從501-1000
以前看到過這種情況,如果一批資料處理中有任何報錯,下次消費從報錯這批資料的第一個offset繼續消費,這樣保證每批資料至少不漏消費
請問實作這種場景怎么配置的,現在用kafka+springboot測驗不是這種效果
最后,還有什么更好的方法,解決這種漏消費問題么,盡量保證不丟,適當重復也可以
uj5u.com熱心網友回復:
有沒有大佬解答下uj5u.com熱心網友回復:
zookeeper里面保存了消費組的offset,修改它就可以uj5u.com熱心網友回復:
手動修改?代碼咋操作或者咋配置呢,可以詳細點么
uj5u.com熱心網友回復:
有沒有大佬知道uj5u.com熱心網友回復:
有沒有大佬解惑下uj5u.com熱心網友回復:
你別自動提交啊,代碼改為手動提交offset啊uj5u.com熱心網友回復:
是可以手動提交實作
但是kafka可以實作拉一批資料,這批中有拋錯,那下一批還是會從有錯誤的這批第一個offset開始消費,雖然可能重但不漏,想知道這個怎么配置或者實作的,而且這種方式是自動提交offset
uj5u.com熱心網友回復:
組態檔里面有個autoCommitOffset設定為false就不會自動提交了,如果你批量消費kafka的資料,當你監聽到一批資料以后,建議批量處理這批資料,而不是單個單個處理,原則就是:批量監聽,批量處理(例如存資料庫),如果你批量監聽一批資料,然后單個處理,是會存在你說的可能重但不漏的問題uj5u.com熱心網友回復:
是的,拉一批資料,然后一條條遍歷處理(比如判斷欄位格式是否正確,是否是想要的資料等等),但是處理中只要拋錯就剩下的就不處理了;
但是看到過別人在一批中有拋錯然后下一批還會拉取這批有錯的,直到這批沒有錯誤就不再拉取,offset也是自動提交,疑惑通過kafka自身是怎么實作的
uj5u.com熱心網友回復:
不管你設定的是自動提交還是手動提交,你監聽kafka的那個方法,也就是有@StreamListener這個注解或者@KafkaListener注解的方法,當這個方法里面沒有發生錯誤,順利執行完,你的offset才會自動提交,如果有任意一個錯誤,也就是這個方法沒有執行完,例外終止了,那并不會提交offset,除非你在里面做了try catch機制,自己吞了例外,才會導致這批資料拋錯了,但是下次拉取會跳過這批資料的問題
uj5u.com熱心網友回復:
組態檔里面有個autoCommitOffset設定為false就不會自動提交了,如果你批量消費kafka的資料,當你監聽到一批資料以后,建議批量處理這批資料,而不是單個單個處理,原則就是:批量監聽,批量處理(例如存資料庫),如果你批量監聽一批資料,然后單個處理,是會存在你說的可能重但不漏的問題
是的,拉一批資料,然后一條條遍歷處理(比如判斷欄位格式是否正確,是否是想要的資料等等),但是處理中只要拋錯就剩下的就不處理了;
但是看到過別人在一批中有拋錯然后下一批還會拉取這批有錯的,直到這批沒有錯誤就不再拉取,offset也是自動提交,疑惑通過kafka自身是怎么實作的
不管你設定的是自動提交還是手動提交,你監聽kafka的那個方法,也就是有@StreamListener這個注解或者@KafkaListener注解的方法,當這個方法里面沒有發生錯誤,順利執行完,你的offset才會自動提交,如果有任意一個錯誤,也就是這個方法沒有執行完,例外終止了,那并不會提交offset,除非你在里面做了try catch機制,自己吞了例外,才會導致這批資料拋錯了,但是下次拉取會跳過這批資料的問題
之前測過別人代碼,好像直接while回圈一直poll,每次拉一批,比如這批中拋例外了,沒有catch,重啟下,這批資料還是會在,又從這批第一個訊息消費,只要某批資料拋例外下次一定會有拉下來,除非這批資料沒有錯誤全部處理完;
但是用springboot寫了個測驗,用@KafkaListener監聽,就是出現不了這種情形,只要一批資料中拋錯不catach,下次重啟這批資料就沒有了,不會像上面那樣只有有例外的那批資料還會被拉下來處理;
所以好奇這是怎么實作的
uj5u.com熱心網友回復:
組態檔里面有個autoCommitOffset設定為false就不會自動提交了,如果你批量消費kafka的資料,當你監聽到一批資料以后,建議批量處理這批資料,而不是單個單個處理,原則就是:批量監聽,批量處理(例如存資料庫),如果你批量監聽一批資料,然后單個處理,是會存在你說的可能重但不漏的問題
是的,拉一批資料,然后一條條遍歷處理(比如判斷欄位格式是否正確,是否是想要的資料等等),但是處理中只要拋錯就剩下的就不處理了;
但是看到過別人在一批中有拋錯然后下一批還會拉取這批有錯的,直到這批沒有錯誤就不再拉取,offset也是自動提交,疑惑通過kafka自身是怎么實作的
不管你設定的是自動提交還是手動提交,你監聽kafka的那個方法,也就是有@StreamListener這個注解或者@KafkaListener注解的方法,當這個方法里面沒有發生錯誤,順利執行完,你的offset才會自動提交,如果有任意一個錯誤,也就是這個方法沒有執行完,例外終止了,那并不會提交offset,除非你在里面做了try catch機制,自己吞了例外,才會導致這批資料拋錯了,但是下次拉取會跳過這批資料的問題
之前測過別人代碼,好像直接while回圈一直poll,每次拉一批,比如這批中拋例外了,沒有catch,重啟下,這批資料還是會在,又從這批第一個訊息消費,只要某批資料拋例外下次一定會有拉下來,除非這批資料沒有錯誤全部處理完;
但是用springboot寫了個測驗,用@KafkaListener監聽,就是出現不了這種情形,只要一批資料中拋錯不catach,下次重啟這批資料就沒有了,不會像上面那樣只有有例外的那批資料還會被拉下來處理;
所以好奇這是怎么實作的
@StreamListener才是批量的吧
uj5u.com熱心網友回復:
組態檔里面有個autoCommitOffset設定為false就不會自動提交了,如果你批量消費kafka的資料,當你監聽到一批資料以后,建議批量處理這批資料,而不是單個單個處理,原則就是:批量監聽,批量處理(例如存資料庫),如果你批量監聽一批資料,然后單個處理,是會存在你說的可能重但不漏的問題
是的,拉一批資料,然后一條條遍歷處理(比如判斷欄位格式是否正確,是否是想要的資料等等),但是處理中只要拋錯就剩下的就不處理了;
但是看到過別人在一批中有拋錯然后下一批還會拉取這批有錯的,直到這批沒有錯誤就不再拉取,offset也是自動提交,疑惑通過kafka自身是怎么實作的
不管你設定的是自動提交還是手動提交,你監聽kafka的那個方法,也就是有@StreamListener這個注解或者@KafkaListener注解的方法,當這個方法里面沒有發生錯誤,順利執行完,你的offset才會自動提交,如果有任意一個錯誤,也就是這個方法沒有執行完,例外終止了,那并不會提交offset,除非你在里面做了try catch機制,自己吞了例外,才會導致這批資料拋錯了,但是下次拉取會跳過這批資料的問題
之前測過別人代碼,好像直接while回圈一直poll,每次拉一批,比如這批中拋例外了,沒有catch,重啟下,這批資料還是會在,又從這批第一個訊息消費,只要某批資料拋例外下次一定會有拉下來,除非這批資料沒有錯誤全部處理完;
但是用springboot寫了個測驗,用@KafkaListener監聽,就是出現不了這種情形,只要一批資料中拋錯不catach,下次重啟這批資料就沒有了,不會像上面那樣只有有例外的那批資料還會被拉下來處理;
所以好奇這是怎么實作的
@StreamListener才是批量的吧
@KafkaListener可以配成批量并發
uj5u.com熱心網友回復:
組態檔里面有個autoCommitOffset設定為false就不會自動提交了,如果你批量消費kafka的資料,當你監聽到一批資料以后,建議批量處理這批資料,而不是單個單個處理,原則就是:批量監聽,批量處理(例如存資料庫),如果你批量監聽一批資料,然后單個處理,是會存在你說的可能重但不漏的問題
是的,拉一批資料,然后一條條遍歷處理(比如判斷欄位格式是否正確,是否是想要的資料等等),但是處理中只要拋錯就剩下的就不處理了;
但是看到過別人在一批中有拋錯然后下一批還會拉取這批有錯的,直到這批沒有錯誤就不再拉取,offset也是自動提交,疑惑通過kafka自身是怎么實作的
不管你設定的是自動提交還是手動提交,你監聽kafka的那個方法,也就是有@StreamListener這個注解或者@KafkaListener注解的方法,當這個方法里面沒有發生錯誤,順利執行完,你的offset才會自動提交,如果有任意一個錯誤,也就是這個方法沒有執行完,例外終止了,那并不會提交offset,除非你在里面做了try catch機制,自己吞了例外,才會導致這批資料拋錯了,但是下次拉取會跳過這批資料的問題
之前測過別人代碼,好像直接while回圈一直poll,每次拉一批,比如這批中拋例外了,沒有catch,重啟下,這批資料還是會在,又從這批第一個訊息消費,只要某批資料拋例外下次一定會有拉下來,除非這批資料沒有錯誤全部處理完;
但是用springboot寫了個測驗,用@KafkaListener監聽,就是出現不了這種情形,只要一批資料中拋錯不catach,下次重啟這批資料就沒有了,不會像上面那樣只有有例外的那批資料還會被拉下來處理;
所以好奇這是怎么實作的
@StreamListener才是批量的吧
@KafkaListener可以配成批量并發
沒配過
uj5u.com熱心網友回復:
組態檔里面有個autoCommitOffset設定為false就不會自動提交了,如果你批量消費kafka的資料,當你監聽到一批資料以后,建議批量處理這批資料,而不是單個單個處理,原則就是:批量監聽,批量處理(例如存資料庫),如果你批量監聽一批資料,然后單個處理,是會存在你說的可能重但不漏的問題
是的,拉一批資料,然后一條條遍歷處理(比如判斷欄位格式是否正確,是否是想要的資料等等),但是處理中只要拋錯就剩下的就不處理了;
但是看到過別人在一批中有拋錯然后下一批還會拉取這批有錯的,直到這批沒有錯誤就不再拉取,offset也是自動提交,疑惑通過kafka自身是怎么實作的
不管你設定的是自動提交還是手動提交,你監聽kafka的那個方法,也就是有@StreamListener這個注解或者@KafkaListener注解的方法,當這個方法里面沒有發生錯誤,順利執行完,你的offset才會自動提交,如果有任意一個錯誤,也就是這個方法沒有執行完,例外終止了,那并不會提交offset,除非你在里面做了try catch機制,自己吞了例外,才會導致這批資料拋錯了,但是下次拉取會跳過這批資料的問題
之前測過別人代碼,好像直接while回圈一直poll,每次拉一批,比如這批中拋例外了,沒有catch,重啟下,這批資料還是會在,又從這批第一個訊息消費,只要某批資料拋例外下次一定會有拉下來,除非這批資料沒有錯誤全部處理完;
但是用springboot寫了個測驗,用@KafkaListener監聽,就是出現不了這種情形,只要一批資料中拋錯不catach,下次重啟這批資料就沒有了,不會像上面那樣只有有例外的那批資料還會被拉下來處理;
所以好奇這是怎么實作的
@StreamListener才是批量的吧
@KafkaListener可以配成批量并發
沒配過
組態檔里面有個autoCommitOffset設定為false就不會自動提交了,如果你批量消費kafka的資料,當你監聽到一批資料以后,建議批量處理這批資料,而不是單個單個處理,原則就是:批量監聽,批量處理(例如存資料庫),如果你批量監聽一批資料,然后單個處理,是會存在你說的可能重但不漏的問題
是的,拉一批資料,然后一條條遍歷處理(比如判斷欄位格式是否正確,是否是想要的資料等等),但是處理中只要拋錯就剩下的就不處理了;
但是看到過別人在一批中有拋錯然后下一批還會拉取這批有錯的,直到這批沒有錯誤就不再拉取,offset也是自動提交,疑惑通過kafka自身是怎么實作的
不管你設定的是自動提交還是手動提交,你監聽kafka的那個方法,也就是有@StreamListener這個注解或者@KafkaListener注解的方法,當這個方法里面沒有發生錯誤,順利執行完,你的offset才會自動提交,如果有任意一個錯誤,也就是這個方法沒有執行完,例外終止了,那并不會提交offset,除非你在里面做了try catch機制,自己吞了例外,才會導致這批資料拋錯了,但是下次拉取會跳過這批資料的問題
之前測過別人代碼,好像直接while回圈一直poll,每次拉一批,比如這批中拋例外了,沒有catch,重啟下,這批資料還是會在,又從這批第一個訊息消費,只要某批資料拋例外下次一定會有拉下來,除非這批資料沒有錯誤全部處理完;
但是用springboot寫了個測驗,用@KafkaListener監聽,就是出現不了這種情形,只要一批資料中拋錯不catach,下次重啟這批資料就沒有了,不會像上面那樣只有有例外的那批資料還會被拉下來處理;
所以好奇這是怎么實作的
@StreamListener才是批量的吧
@KafkaListener可以配成批量并發
沒配過
大佬,比如現在topic的數量是可配的,所以要監聽可變的topic,名稱是有規律的,所以想到用模糊匹配去做@KafkaListener(topicPattern = "test.*") 可以匹配多個topic,
但如果想匹配上幾個topic就要起幾個執行緒處理相應topic的訊息怎么實作?比如第一個執行緒消費第一個topic,第二個執行緒處理第二個topic內容這樣,topic是有規律的,test_1, test_2等等,配的幾就幾
這咋實作比較好
uj5u.com熱心網友回復:
組態檔里面有個autoCommitOffset設定為false就不會自動提交了,如果你批量消費kafka的資料,當你監聽到一批資料以后,建議批量處理這批資料,而不是單個單個處理,原則就是:批量監聽,批量處理(例如存資料庫),如果你批量監聽一批資料,然后單個處理,是會存在你說的可能重但不漏的問題
是的,拉一批資料,然后一條條遍歷處理(比如判斷欄位格式是否正確,是否是想要的資料等等),但是處理中只要拋錯就剩下的就不處理了;
但是看到過別人在一批中有拋錯然后下一批還會拉取這批有錯的,直到這批沒有錯誤就不再拉取,offset也是自動提交,疑惑通過kafka自身是怎么實作的
不管你設定的是自動提交還是手動提交,你監聽kafka的那個方法,也就是有@StreamListener這個注解或者@KafkaListener注解的方法,當這個方法里面沒有發生錯誤,順利執行完,你的offset才會自動提交,如果有任意一個錯誤,也就是這個方法沒有執行完,例外終止了,那并不會提交offset,除非你在里面做了try catch機制,自己吞了例外,才會導致這批資料拋錯了,但是下次拉取會跳過這批資料的問題
之前測過別人代碼,好像直接while回圈一直poll,每次拉一批,比如這批中拋例外了,沒有catch,重啟下,這批資料還是會在,又從這批第一個訊息消費,只要某批資料拋例外下次一定會有拉下來,除非這批資料沒有錯誤全部處理完;
但是用springboot寫了個測驗,用@KafkaListener監聽,就是出現不了這種情形,只要一批資料中拋錯不catach,下次重啟這批資料就沒有了,不會像上面那樣只有有例外的那批資料還會被拉下來處理;
所以好奇這是怎么實作的
@StreamListener才是批量的吧
@KafkaListener可以配成批量并發
沒配過
組態檔里面有個autoCommitOffset設定為false就不會自動提交了,如果你批量消費kafka的資料,當你監聽到一批資料以后,建議批量處理這批資料,而不是單個單個處理,原則就是:批量監聽,批量處理(例如存資料庫),如果你批量監聽一批資料,然后單個處理,是會存在你說的可能重但不漏的問題
是的,拉一批資料,然后一條條遍歷處理(比如判斷欄位格式是否正確,是否是想要的資料等等),但是處理中只要拋錯就剩下的就不處理了;
但是看到過別人在一批中有拋錯然后下一批還會拉取這批有錯的,直到這批沒有錯誤就不再拉取,offset也是自動提交,疑惑通過kafka自身是怎么實作的
不管你設定的是自動提交還是手動提交,你監聽kafka的那個方法,也就是有@StreamListener這個注解或者@KafkaListener注解的方法,當這個方法里面沒有發生錯誤,順利執行完,你的offset才會自動提交,如果有任意一個錯誤,也就是這個方法沒有執行完,例外終止了,那并不會提交offset,除非你在里面做了try catch機制,自己吞了例外,才會導致這批資料拋錯了,但是下次拉取會跳過這批資料的問題
之前測過別人代碼,好像直接while回圈一直poll,每次拉一批,比如這批中拋例外了,沒有catch,重啟下,這批資料還是會在,又從這批第一個訊息消費,只要某批資料拋例外下次一定會有拉下來,除非這批資料沒有錯誤全部處理完;
但是用springboot寫了個測驗,用@KafkaListener監聽,就是出現不了這種情形,只要一批資料中拋錯不catach,下次重啟這批資料就沒有了,不會像上面那樣只有有例外的那批資料還會被拉下來處理;
所以好奇這是怎么實作的
@StreamListener才是批量的吧
@KafkaListener可以配成批量并發
沒配過
大佬,比如現在topic的數量是可配的,所以要監聽可變的topic,名稱是有規律的,所以想到用模糊匹配去做@KafkaListener(topicPattern = "test.*") 可以匹配多個topic,
但如果想匹配上幾個topic就要起幾個執行緒處理相應topic的訊息怎么實作?比如第一個執行緒消費第一個topic,第二個執行緒處理第二個topic內容這樣,topic是有規律的,test_1, test_2等等,配的幾就幾
這咋實作比較好
https://www.cnblogs.com/gaoyawei/p/7723974.html
希望對你有用,參考下
uj5u.com熱心網友回復:
組態檔里面有個autoCommitOffset設定為false就不會自動提交了,如果你批量消費kafka的資料,當你監聽到一批資料以后,建議批量處理這批資料,而不是單個單個處理,原則就是:批量監聽,批量處理(例如存資料庫),如果你批量監聽一批資料,然后單個處理,是會存在你說的可能重但不漏的問題
是的,拉一批資料,然后一條條遍歷處理(比如判斷欄位格式是否正確,是否是想要的資料等等),但是處理中只要拋錯就剩下的就不處理了;
但是看到過別人在一批中有拋錯然后下一批還會拉取這批有錯的,直到這批沒有錯誤就不再拉取,offset也是自動提交,疑惑通過kafka自身是怎么實作的
不管你設定的是自動提交還是手動提交,你監聽kafka的那個方法,也就是有@StreamListener這個注解或者@KafkaListener注解的方法,當這個方法里面沒有發生錯誤,順利執行完,你的offset才會自動提交,如果有任意一個錯誤,也就是這個方法沒有執行完,例外終止了,那并不會提交offset,除非你在里面做了try catch機制,自己吞了例外,才會導致這批資料拋錯了,但是下次拉取會跳過這批資料的問題
之前測過別人代碼,好像直接while回圈一直poll,每次拉一批,比如這批中拋例外了,沒有catch,重啟下,這批資料還是會在,又從這批第一個訊息消費,只要某批資料拋例外下次一定會有拉下來,除非這批資料沒有錯誤全部處理完;
但是用springboot寫了個測驗,用@KafkaListener監聽,就是出現不了這種情形,只要一批資料中拋錯不catach,下次重啟這批資料就沒有了,不會像上面那樣只有有例外的那批資料還會被拉下來處理;
所以好奇這是怎么實作的
@StreamListener才是批量的吧
@KafkaListener可以配成批量并發
沒配過
組態檔里面有個autoCommitOffset設定為false就不會自動提交了,如果你批量消費kafka的資料,當你監聽到一批資料以后,建議批量處理這批資料,而不是單個單個處理,原則就是:批量監聽,批量處理(例如存資料庫),如果你批量監聽一批資料,然后單個處理,是會存在你說的可能重但不漏的問題
是的,拉一批資料,然后一條條遍歷處理(比如判斷欄位格式是否正確,是否是想要的資料等等),但是處理中只要拋錯就剩下的就不處理了;
但是看到過別人在一批中有拋錯然后下一批還會拉取這批有錯的,直到這批沒有錯誤就不再拉取,offset也是自動提交,疑惑通過kafka自身是怎么實作的
不管你設定的是自動提交還是手動提交,你監聽kafka的那個方法,也就是有@StreamListener這個注解或者@KafkaListener注解的方法,當這個方法里面沒有發生錯誤,順利執行完,你的offset才會自動提交,如果有任意一個錯誤,也就是這個方法沒有執行完,例外終止了,那并不會提交offset,除非你在里面做了try catch機制,自己吞了例外,才會導致這批資料拋錯了,但是下次拉取會跳過這批資料的問題
之前測過別人代碼,好像直接while回圈一直poll,每次拉一批,比如這批中拋例外了,沒有catch,重啟下,這批資料還是會在,又從這批第一個訊息消費,只要某批資料拋例外下次一定會有拉下來,除非這批資料沒有錯誤全部處理完;
但是用springboot寫了個測驗,用@KafkaListener監聽,就是出現不了這種情形,只要一批資料中拋錯不catach,下次重啟這批資料就沒有了,不會像上面那樣只有有例外的那批資料還會被拉下來處理;
所以好奇這是怎么實作的
@StreamListener才是批量的吧
@KafkaListener可以配成批量并發
沒配過
大佬,比如現在topic的數量是可配的,所以要監聽可變的topic,名稱是有規律的,所以想到用模糊匹配去做@KafkaListener(topicPattern = "test.*") 可以匹配多個topic,
但如果想匹配上幾個topic就要起幾個執行緒處理相應topic的訊息怎么實作?比如第一個執行緒消費第一個topic,第二個執行緒處理第二個topic內容這樣,topic是有規律的,test_1, test_2等等,配的幾就幾
這咋實作比較好
https://www.cnblogs.com/gaoyawei/p/7723974.html
希望對你有用,參考下
這是用原生api的方式,然后創建執行緒,
但現在的邏輯都是注解實作監聽@KafkaListener,
想根據配置的topic數量,比如配了10,那么就創建10個帶@KafkaListener的方法,因為一個帶@KafkaListener的方法就是單獨一個執行緒處理的,但是@KafkaListener又不能用在方法內部所以大佬知不知道這咋實作
uj5u.com熱心網友回復:
組態檔里面有個autoCommitOffset設定為false就不會自動提交了,如果你批量消費kafka的資料,當你監聽到一批資料以后,建議批量處理這批資料,而不是單個單個處理,原則就是:批量監聽,批量處理(例如存資料庫),如果你批量監聽一批資料,然后單個處理,是會存在你說的可能重但不漏的問題
是的,拉一批資料,然后一條條遍歷處理(比如判斷欄位格式是否正確,是否是想要的資料等等),但是處理中只要拋錯就剩下的就不處理了;
但是看到過別人在一批中有拋錯然后下一批還會拉取這批有錯的,直到這批沒有錯誤就不再拉取,offset也是自動提交,疑惑通過kafka自身是怎么實作的
不管你設定的是自動提交還是手動提交,你監聽kafka的那個方法,也就是有@StreamListener這個注解或者@KafkaListener注解的方法,當這個方法里面沒有發生錯誤,順利執行完,你的offset才會自動提交,如果有任意一個錯誤,也就是這個方法沒有執行完,例外終止了,那并不會提交offset,除非你在里面做了try catch機制,自己吞了例外,才會導致這批資料拋錯了,但是下次拉取會跳過這批資料的問題
之前測過別人代碼,好像直接while回圈一直poll,每次拉一批,比如這批中拋例外了,沒有catch,重啟下,這批資料還是會在,又從這批第一個訊息消費,只要某批資料拋例外下次一定會有拉下來,除非這批資料沒有錯誤全部處理完;
但是用springboot寫了個測驗,用@KafkaListener監聽,就是出現不了這種情形,只要一批資料中拋錯不catach,下次重啟這批資料就沒有了,不會像上面那樣只有有例外的那批資料還會被拉下來處理;
所以好奇這是怎么實作的
@StreamListener才是批量的吧
@KafkaListener可以配成批量并發
沒配過
組態檔里面有個autoCommitOffset設定為false就不會自動提交了,如果你批量消費kafka的資料,當你監聽到一批資料以后,建議批量處理這批資料,而不是單個單個處理,原則就是:批量監聽,批量處理(例如存資料庫),如果你批量監聽一批資料,然后單個處理,是會存在你說的可能重但不漏的問題
是的,拉一批資料,然后一條條遍歷處理(比如判斷欄位格式是否正確,是否是想要的資料等等),但是處理中只要拋錯就剩下的就不處理了;
但是看到過別人在一批中有拋錯然后下一批還會拉取這批有錯的,直到這批沒有錯誤就不再拉取,offset也是自動提交,疑惑通過kafka自身是怎么實作的
不管你設定的是自動提交還是手動提交,你監聽kafka的那個方法,也就是有@StreamListener這個注解或者@KafkaListener注解的方法,當這個方法里面沒有發生錯誤,順利執行完,你的offset才會自動提交,如果有任意一個錯誤,也就是這個方法沒有執行完,例外終止了,那并不會提交offset,除非你在里面做了try catch機制,自己吞了例外,才會導致這批資料拋錯了,但是下次拉取會跳過這批資料的問題
之前測過別人代碼,好像直接while回圈一直poll,每次拉一批,比如這批中拋例外了,沒有catch,重啟下,這批資料還是會在,又從這批第一個訊息消費,只要某批資料拋例外下次一定會有拉下來,除非這批資料沒有錯誤全部處理完;
但是用springboot寫了個測驗,用@KafkaListener監聽,就是出現不了這種情形,只要一批資料中拋錯不catach,下次重啟這批資料就沒有了,不會像上面那樣只有有例外的那批資料還會被拉下來處理;
所以好奇這是怎么實作的
@StreamListener才是批量的吧
@KafkaListener可以配成批量并發
沒配過
大佬,比如現在topic的數量是可配的,所以要監聽可變的topic,名稱是有規律的,所以想到用模糊匹配去做@KafkaListener(topicPattern = "test.*") 可以匹配多個topic,
但如果想匹配上幾個topic就要起幾個執行緒處理相應topic的訊息怎么實作?比如第一個執行緒消費第一個topic,第二個執行緒處理第二個topic內容這樣,topic是有規律的,test_1, test_2等等,配的幾就幾
這咋實作比較好
https://www.cnblogs.com/gaoyawei/p/7723974.html
希望對你有用,參考下
這是用原生api的方式,然后創建執行緒,
但現在的邏輯都是注解實作監聽@KafkaListener,
想根據配置的topic數量,比如配了10,那么就創建10個帶@KafkaListener的方法,因為一個帶@KafkaListener的方法就是單獨一個執行緒處理的,但是@KafkaListener又不能用在方法內部所以大佬知不知道這咋實作
組態檔里面有個autoCommitOffset設定為false就不會自動提交了,如果你批量消費kafka的資料,當你監聽到一批資料以后,建議批量處理這批資料,而不是單個單個處理,原則就是:批量監聽,批量處理(例如存資料庫),如果你批量監聽一批資料,然后單個處理,是會存在你說的可能重但不漏的問題
是的,拉一批資料,然后一條條遍歷處理(比如判斷欄位格式是否正確,是否是想要的資料等等),但是處理中只要拋錯就剩下的就不處理了;
但是看到過別人在一批中有拋錯然后下一批還會拉取這批有錯的,直到這批沒有錯誤就不再拉取,offset也是自動提交,疑惑通過kafka自身是怎么實作的
不管你設定的是自動提交還是手動提交,你監聽kafka的那個方法,也就是有@StreamListener這個注解或者@KafkaListener注解的方法,當這個方法里面沒有發生錯誤,順利執行完,你的offset才會自動提交,如果有任意一個錯誤,也就是這個方法沒有執行完,例外終止了,那并不會提交offset,除非你在里面做了try catch機制,自己吞了例外,才會導致這批資料拋錯了,但是下次拉取會跳過這批資料的問題
之前測過別人代碼,好像直接while回圈一直poll,每次拉一批,比如這批中拋例外了,沒有catch,重啟下,這批資料還是會在,又從這批第一個訊息消費,只要某批資料拋例外下次一定會有拉下來,除非這批資料沒有錯誤全部處理完;
但是用springboot寫了個測驗,用@KafkaListener監聽,就是出現不了這種情形,只要一批資料中拋錯不catach,下次重啟這批資料就沒有了,不會像上面那樣只有有例外的那批資料還會被拉下來處理;
所以好奇這是怎么實作的
@StreamListener才是批量的吧
@KafkaListener可以配成批量并發
沒配過
組態檔里面有個autoCommitOffset設定為false就不會自動提交了,如果你批量消費kafka的資料,當你監聽到一批資料以后,建議批量處理這批資料,而不是單個單個處理,原則就是:批量監聽,批量處理(例如存資料庫),如果你批量監聽一批資料,然后單個處理,是會存在你說的可能重但不漏的問題
是的,拉一批資料,然后一條條遍歷處理(比如判斷欄位格式是否正確,是否是想要的資料等等),但是處理中只要拋錯就剩下的就不處理了;
但是看到過別人在一批中有拋錯然后下一批還會拉取這批有錯的,直到這批沒有錯誤就不再拉取,offset也是自動提交,疑惑通過kafka自身是怎么實作的
不管你設定的是自動提交還是手動提交,你監聽kafka的那個方法,也就是有@StreamListener這個注解或者@KafkaListener注解的方法,當這個方法里面沒有發生錯誤,順利執行完,你的offset才會自動提交,如果有任意一個錯誤,也就是這個方法沒有執行完,例外終止了,那并不會提交offset,除非你在里面做了try catch機制,自己吞了例外,才會導致這批資料拋錯了,但是下次拉取會跳過這批資料的問題
之前測過別人代碼,好像直接while回圈一直poll,每次拉一批,比如這批中拋例外了,沒有catch,重啟下,這批資料還是會在,又從這批第一個訊息消費,只要某批資料拋例外下次一定會有拉下來,除非這批資料沒有錯誤全部處理完;
但是用springboot寫了個測驗,用@KafkaListener監聽,就是出現不了這種情形,只要一批資料中拋錯不catach,下次重啟這批資料就沒有了,不會像上面那樣只有有例外的那批資料還會被拉下來處理;
所以好奇這是怎么實作的
@StreamListener才是批量的吧
@KafkaListener可以配成批量并發
沒配過
大佬,比如現在topic的數量是可配的,所以要監聽可變的topic,名稱是有規律的,所以想到用模糊匹配去做@KafkaListener(topicPattern = "test.*") 可以匹配多個topic,
但如果想匹配上幾個topic就要起幾個執行緒處理相應topic的訊息怎么實作?比如第一個執行緒消費第一個topic,第二個執行緒處理第二個topic內容這樣,topic是有規律的,test_1, test_2等等,配的幾就幾
這咋實作比較好
https://www.cnblogs.com/gaoyawei/p/7723974.html
希望對你有用,參考下
這是用原生api的方式,然后創建執行緒,
但現在的邏輯都是注解實作監聽@KafkaListener,
想根據配置的topic數量,比如配了10,那么就創建10個帶@KafkaListener的方法,因為一個帶@KafkaListener的方法就是單獨一個執行緒處理的,但是@KafkaListener又不能用在方法內部所以大佬知不知道這咋實作
沒這么玩過,所以我也沒經驗
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/111703.html
標籤:Java EE
