架構
客戶打算把聊天內容發送到php server
php server 接收訊息并進行相關業務處理,然后publish到redis
node js 作為mqtt 的broker 訂閱redis

broker 安裝
aedes可以在任何流服務器上運行的準系統 MQTT 服務器
- 建立一個檔案夾,并在該檔案夾內打開終端初始化專案
$ npm init -y
$ npm install aedes
- 安裝redis mqemitter
$ npm install mqemitter-redis --save
新建index.js檔案
var http = require('http')
var websocket = require('websocket-stream')
require('events').EventEmitter.defaultMaxListeners = 0
var mq = require('mqemitter-redis')({
port: '6379',
host: '127.0.0.1',
password: '123456',
db: 2
})
var aedes = require('aedes')({
concurrency: 666,
mq: mq
})
// 身份驗證
aedes.authenticate = function (client, username, password, callback) {
callback(null, (username === 'user' && password.toString() === '123456'));
}
//禁止pub
aedes.authorizePublish = function(client, packet, callback) {
callback(new Error('no publish'));
};
//sub訊息
aedes.authorizeSubscribe = function (client, sub, callback) {
if (sub.topic === 'aaaa') {
return callback(new Error('wrong topic'))
}
if (sub.topic === 'bbb') {
// overwrites subscription
sub.topic = 'foo'
sub.qos = 1
}
callback(null, sub)
}
var server = require('net').createServer(aedes.handle)
server.listen(7888, function () {
console.log('===server listening on port===', 1883)
})
var httpServ = http.createServer()
websocket.createServer({server: httpServ, port: 1884}, aedes.handle)
我們根據mqemitter-redis 拼裝redis publish資料結構
mqemitter-redis.js
function handler (sub, topic, payload) {
var packet = msgpack.decode(payload)
//console.log("packet:"+packet.id)
if(that.redisCache.set("p:"+packet.id, 1, "EX", 10, "NX")){
//console.log("packet:"+packet.id+":ok")
that._emit(packet.msg)
}
}
根據以上代碼我們可以得到
- 訊息需要msgpack壓縮
- 必須有id,msg 欄位
- msg 資料結構參考mqtt-packet 的publish結構
{
cmd: 'publish',
messageId: 42,
qos: 1,
dup: false,
topic: 'test',
payload: new Buffer('test'),
retain: false,
properties: { // optional properties MQTT 5.0
payloadFormatIndicator: true,
messageExpiryInterval: 4321,
topicAlias: 100,
responseTopic: 'topic',
correlationData: Buffer.from([1, 2, 3, 4]),
userProperties: {
'test': 'test'
},
subscriptionIdentifier: 120, // can be an Array in message from broker, if message included in few another subscriptions
contentType: 'test'
}
}
php server 代碼
server.php
public function send($topic, $content){
$message = [];
$message['id'] = uuid();
$message['msg'] = [];
$message['msg']['cmd'] = 'publish';
$message['msg']['retain'] = false;
$message['msg']['qos'] = 2;
$message['msg']['topic'] = $topic;
$message['msg']['payload'] = $content;
$message['msg']['messageId'] = uuid();
//$message['msg']['properties'] = ['messageExpiryInterval':125]; //optional properties
//發布訊息
(\Redis ())->publish($topic, msgpack_pack($message));
}
$this->send('test','我就測驗下');
$this->send('bbb','我就測驗下');
把server.php 改成接收api 傳遞訊息
前段監聽1884埠
到此整個用MQTT 實作聊天功能的架構就完成了
總結
- 官方對于aedes的檔案不甚詳盡,對于我這node 渣渣來說還是很費勁
- 官方檔案有介紹用Redis實作集群,雖然MQTT本是為物聯網設計,但就我們的業務而言也是夠用
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/291942.html
標籤:其他
