我正在嘗試動態更改 mqtt 客戶端處理程式和證書,這會在訂閱者和發布者連接時導致訂閱者 EOF
這就是我正在嘗試做的
1] 我正在初始化訂閱者/發布者(使用 firstPubHandler、firstConnectHandler 和默認證書)
2]使用發布者在服務器上發送注冊訊息以獲取新證書的詳細資訊
3] 服務器將回應證書詳細資訊,該回應將由 firstConnectHandler 在主題.../id/Certificate 上處理以下載證書。
4] firstPubHandler 將處理服務器的回應并重新初始化發布者/訂閱者(使用 messagePubHandler、connectHandler 和新下載的證書),connectHandler 將偵聽所有主題/id/
一切正常,除了當我重新初始化訂閱者/發布者時,訂閱者不斷斷開連接并出現錯誤“EOF”
我在這里做錯了什么嗎?或者有沒有更好的方法來實作這一點?任何幫助表示贊賞
- 主功能
var opt Params
var publisher mqtt.Client
var subscriber mqtt.Client
func main() {
InitializeBroker(firstPubHandler, firstConnectHandler)
//Ultimately it will trigger message on ".../id/Certificate" topic which will be handled byfirstConnectHandler
PublishRegistrationMessage(publisher)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
done := make(chan os.Signal, 1)
signal.Notify(done, os.Interrupt, syscall.SIGTERM)
go func() {
for {
}
}()
<-done
<-c
DisconnectBrocker()
}
-- 處理程式
// First handlers
var firstPubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
DownloadCertificates(msg.Payload())
InitializeBroker(messagePubHandler, connectHandler)
}
var firstConnectHandler mqtt.OnConnectHandler = func(c mqtt.Client) {
if token := c.Subscribe(opt.SubClientId "/id/Certificate", 0, firstPubHandler); token.Wait() && token.Error() != nil {
log.Error(token.Error())
}
}
// Second handlers
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
ProcessMessage(msg.Payload())
}
var connectHandler mqtt.OnConnectHandler = func(c mqtt.Client) {
if token := c.Subscribe(opt.SubClientId "/id/ ", 0, messagePubHandler); token.Wait() && token.Error() != nil {
log.Error(token.Error())
}
}
// Common handler
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
log.Info(err)
}
--MQTT 代理初始化
func InitializeBroker(lMessageHandler mqtt.MessageHandler, lConnectHandler mqtt.OnConnectHandler) {
statusPublishTopic := opt.PubClientId/id
nodeSubscribeTopic := opt.SubClientId/id
// Build the options for the publish client
publisherOptions := mqtt.NewClientOptions()
publisherOptions.AddBroker(opt.Broker)
publisherOptions.SetClientID(statusPublishTopic)
publisherOptions.SetDefaultPublishHandler(lMessageHandler)
publisherOptions.OnConnectionLost = connectLostHandler
// Build the options for the subscribe client
subscriberOptions := mqtt.NewClientOptions()
subscriberOptions.AddBroker(opt.Broker)
subscriberOptions.SetClientID(nodeSubscribeTopic)
subscriberOptions.SetDefaultPublishHandler(lMessageHandler)
subscriberOptions.OnConnectionLost = connectLostHandler
subscriberOptions.OnConnect = lConnectHandler
if !opt.NoTLS {
tlsconfig, err := NewTLSConfig()
if err != nil {
log.Fatalf(err)
}
subscriberOptions.SetTLSConfig(tlsconfig)
publisherOptions.SetTLSConfig(tlsconfig)
}
publisher = mqtt.NewClient(publisherOptions)
if token := publisher.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf(token.Error())
}
subscriber = mqtt.NewClient(subscriberOptions)
if token := subscriber.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf(token.Error())
}
}
func NewTLSConfig() (config *tls.Config, err error) {
certpool := x509.NewCertPool()
pemCerts, err := ioutil.ReadFile(rootCert)
if err != nil {
return nil, err
}
certpool.AppendCertsFromPEM(pemCerts)
cert, err := tls.LoadX509KeyPair(nodeCertFilePath, pvtKeyFilePath)
if err != nil {
return nil, err
}
config = &tls.Config{
RootCAs: certpool,
ClientAuth: tls.NoClientCert,
ClientCAs: nil,
Certificates: []tls.Certificate{cert},
}
return config, nil
}
uj5u.com熱心網友回復:
基于對您的代碼的快速審查,這就是我認為正在發生的事情(因為您沒有提供所有代碼,因此涉及一些猜測):
main()InitializeBroker創建到代理的兩個連接的呼叫。默認發布處理程式設定為firstPubHandler并在OnConnect您訂閱的處理程式中SubClientId "/id/Certificate- 當收到訊息 (
firstPubHandler) 時,您從訊息中獲取證書,并使用它來建立一組新的連接到代理的連接,這些連接使用相同的客戶端 ID 但不同的OnConnect/default 發布處理程式。
因此,在第 2 點之后,您實際上有兩組單獨的連接到代理(總共 4 個連接)。但是MQTT-3.1.4-2(請參閱規范)指出:
如果 ClientId 代表已經連接到服務器的客戶端,那么服務器必須斷開與現有客戶端的連接。
因此,當建立第二組連接時,代理將丟棄第一組連接。這是您看到的“EOF”斷開連接。第二組連接仍將啟動。由于您connectLostHandler對第一組和第二組連接使用相同的連接,因此您無法在日志中看到哪個連接被終止。
總之,我相信您的代碼確實有效。但是,您可能應該呼叫c.Disconnect(),firstConnectHandler以便在建立第二組連接之前完全關閉初始連接。您還需要將其存盤在publisher某個地方,以便同時斷開該連接。
注意:我很難理解你為什么要這樣做。建立初始連接以檢索證書似乎會降低系統的整體安全性。標準方法是給每個客戶端一個唯一的證書,然后在代理上使用 ACL 來應用任何必要的限制。對于許多代理,您可以在 ACL 中使用證書公用名(從而消除對第二個連接的需要)。
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/347822.html
