關于長連接的一些介紹
長連接的應用場景非常的廣泛,比如監控系統,IM系統,即時報價系統,推送服務等等,像這些場景都是比較注重實時性,如果每次發送資料都要進行一次DNS決議,建立連接的程序肯定是極其影響體驗,
長連接的維護必然需要一套機制來控制,比如 HTTP/1.0 通過在 header 頭中添加 Connection:Keep-Alive引數,如果當前請求需要保活則添加該引數作為標識,否則服務端就不會保持該連接的狀態,發送完資料之后就關閉連接,HTTP/1.1以后 Keep-Alive 是默認打開的,
Netty 是 基于 TCP 協議開發的,在四層協議 TCP 協議的實作中也提供了 keepalive 報文用來探測對端是否可用,TCP 層將在定時時間到后發送相應的 KeepAlive 探針以確定連接可用性
Netty狀態回呼代碼
class ClientHandler : ChannelInboundHandlerAdapter() {
/** 客戶端請求的心跳命令 */
private val heartBeat =
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("heartbeat", CharsetUtil.UTF_8))
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
//接收到訊息
}
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
//添加心跳
if (evt is IdleStateEvent) {
if (IdleState.WRITER_IDLE?.equals(evt.state())) {
ctx.writeAndFlush(heartBeat.duplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
}
}
super.userEventTriggered(ctx, evt)
}
override fun channelActive(ctx: ChannelHandlerContext) {
//連接成功回呼
ctx.fireChannelActive()
}
override fun channelInactive(ctx: ChannelHandlerContext) {
super.channelInactive(ctx)
//中途斷開回呼
}
override fun exceptionCaught(
ctx: ChannelHandlerContext,
cause: Throwable
) {
//連接拋出例外回呼
cause.printStackTrace()
ctx.close()
}
override fun handlerRemoved(ctx: ChannelHandlerContext) {
super.handlerRemoved(ctx)
//與服務斷開連接時的回呼
NettyObserverManager.instance.notifyObserver(ctx)//觀察者發送斷開通知 代碼在下面
}
}
添加回呼的管理類
class SimpleChatClientInitializer :
ChannelInitializer<SocketChannel>() {
@Throws(Exception::class)
override fun initChannel(ch: SocketChannel) {
val pipeline = ch.pipeline()
//心跳每次像服務端發送的頻率
pipeline.addLast(IdleStateHandler(4, 4, 4, TimeUnit.SECONDS))
pipeline.addLast("decoder", StringDecoder())
pipeline.addLast("encoder", StringEncoder())
//添加自定義的監聽回呼物件
pipeline.addLast("handler", ClientHandler())
}
}
使用服務開啟長連接
class NotifyService : Service(), ChannelFutureListener, NettyObserverListener {
private var channel: Channel? = null
private var host = "xxx.xxx.xxx"//與后臺統一的IP地址
private val port = 0//與后臺統一的埠號
private var nio: NioEventLoopGroup? = null
private val clientInitializer = SimpleChatClientInitializer()
private val grayServiceId = 1001
//當前服務物件
private val mBinder = ClientBinder()
override fun onBind(intent: Intent): IBinder {
return mBinder
}
override fun onCreate() {
super.onCreate()
//添加觀察者
NettyObserverManager.instance.add(this)
init()
}
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
//設定前臺服務提高優先級
if (Build.VERSION.SDK_INT < Build.VERSION_CODES.O) {
//Android4.3 - Android8.0,隱藏Notification上的圖示
val innerIntent = Intent(this, GrayInnerService::class.java)
startService(innerIntent)
startForeground(grayServiceId, Notification())
} else {
//Android8.0以上app啟動后通知欄會出現一條"正在運行"的通知
val channel = NotificationChannel(
"com.guanwei.pddemo", "notify",
NotificationManager.IMPORTANCE_HIGH
)
val manager = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
manager.createNotificationChannel(channel)
val notification = Notification.Builder(
applicationContext,
"com.guanwei.pddemo"
).build()
startForeground(grayServiceId, notification)
}
return START_STICKY
}
/**
* 服務在后臺灰色保活
*/
inner class GrayInnerService : Service() {
override fun onBind(intent: Intent?): IBinder? {
return null
}
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
startForeground(grayServiceId, Notification())
stopForeground(true)
stopSelf()
return super.onStartCommand(intent, flags, startId)
}
}
private fun init() {
Thread {
kotlin.run {
if (nio == null) {
nio = NioEventLoopGroup()
}
//開始連接
doConnect()
}
}.start()
}
private fun doConnect() {
try {
channel = Bootstrap().run {
//連接建立
group(nio)
channel(NioSocketChannel::class.java)
handler(clientInitializer)
connect(
host,
port
).sync().channel()
}
val map = HashMap<String, Long>()
map["id"] = SPUtils.getInstance().getLong("userId")
val json = JSONObject(map as Map<*, *>).toString()
sendMessage(json)
} catch (e: Exception) {
e.stackTrace
}
}
fun sendMessage(msg: String) {
//向服務端發送訊息
channel!!.writeAndFlush(msg).addListener(this)
}
/**
* 連接失敗回呼
*/
override fun operationComplete(future: ChannelFuture) {
if (!future.isSuccess) {
//連接失敗重新連接
//每三秒進行一次重連接
future.channel().eventLoop().schedule({
kotlin.run {
doConnect()
}
}, 3, TimeUnit.SECONDS)
} else {
LogUtils.a("連接成功")
}
}
override fun nettyObserverUpData(v: ChannelHandlerContext) {
//每三秒進行一次重連接
v.channel().eventLoop().schedule({
kotlin.run {
doConnect()
}
}, 3, TimeUnit.SECONDS)
}
override fun onDestroy() {
super.onDestroy()
//移除觀察者
NettyObserverManager.instance.remove(this)
}
/**
* 與activity進行互動
*/
inner class ClientBinder : Binder() {
public fun getService(): NotifyService {
return this@NotifyService
}
public fun sendMsg(msg: String) {
sendMessage(msg)
}
}
}
封裝觀察者進行狀態監聽與通知
/**
* netty狀態觀察者介面
*/
interface NettyObserverListener {
/**
* 重繪操作
*/
fun nettyObserverUpData(v: ChannelHandlerContext)
}
/**
* netty觀察者操作介面
*/
interface NettySubjectListener {
/**
* 添加
*/
fun add(nettyObserverListener: NettyObserverListener)
/**
* 通知內容
*/
fun notifyObserver(t: ChannelHandlerContext)
/**
* 洗掉
*/
fun remove(nettyObserverListener: NettyObserverListener)
}
class NettyObserverManager : NettySubjectListener {
/**
* 資料集合
*/
private val list = ArrayList<NettyObserverListener>()
/**
* 添加
*/
override fun add(nettyObserverListener: NettyObserverListener) {
list.add(nettyObserverListener)
}
/**
* 更新
*/
override fun notifyObserver(t: ChannelHandlerContext) {
//資料更新
for (ol in list) {
ol.nettyObserverUpData(t)
}
}
/**
* 移除
*/
override fun remove(nettyObserverListener: NettyObserverListener) {
list.remove(nettyObserverListener)
}
companion object {
val instance: NettyObserverManager by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) {
NettyObserverManager()
}
}
}
在Activity中開啟、系結服務
服務開啟與系結所需要的引數如何獲取就不多講了,不會的建議回去重新學一下子服務
//先開啟服務
startService(iIntent)
//系結服務
bindService(
iIntent,
myConnection,
Context.BIND_AUTO_CREATE
)
override fun onDestroy() {
super.onDestroy()
//服務解綁
unbindService(myConnection)
//關閉服務
stopService(iIntent)
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/297169.html
標籤:其他
