1、netty如何決議多協議
前提:
專案地址:https://gitee.com/q529075990qqcom/NB-IOT.git
我們需要一個創建mavne專案,這個專案是我已經寫好的專案,專案結構圖如下:

創建公共模塊
創建子模塊,準備好依賴Netty4.1版本
<dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.72.Final</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.28</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.28</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>RELEASE</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.esotericsoftware</groupId> <artifactId>kryo</artifactId> <version>5.3.0</version> </dependency> </dependencies>maven依賴
序列化的定義是:將一個物件編碼成一個位元組流(I/O);而與之相反的操作被稱為反序列化,
package serializer; /** * @description: * @author: quliang * @create: 2022-10-20 15:16 **/ public interface Serializer { /** * 序列化 * * @param obj * @return * @throws Exception */ byte[] serialize(Object obj) throws Exception; /** * 反序列化 * * @param bytes * @param clazz * @param <T> * @return * @throws Exception */ <T> T deserialize(byte[] bytes, Class<T> clazz) throws Exception; }自定義序列化介面
package serializer; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy; import org.objenesis.strategy.StdInstantiatorStrategy; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; /** * @description: * @author: quliang * @create: 2022-10-20 15:18 **/ public class KryoSerializer implements Serializer { private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> { Kryo kryo = new Kryo(); kryo.setReferences(true); kryo.setRegistrationRequired(false); ((DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()) .setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); return kryo; }); @Override public byte[] serialize(Object obj) throws Exception { try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { Output output = new Output(baos); Kryo kryo = kryoThreadLocal.get(); kryo.writeObject(output, obj); kryoThreadLocal.remove(); return output.toBytes(); } catch (IOException e) { throw new Exception("序列化失敗", e); } } @Override public <T> T deserialize(byte[] bytes, Class<T> clazz) throws Exception { try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) { Input input = new Input(bais); Kryo kryo = kryoThreadLocal.get(); Object obj = kryo.readObject(input, clazz); kryoThreadLocal.remove(); return clazz.cast(obj); } catch (IOException e) { throw new Exception("反序化失敗"); } } }Kryo實作序列化介面
我們需要決議兩種協議,那我們就要提前定義好兩種協議,分別是訊息協議、登錄協議
訊息協議相關
package protocol.msg; import lombok.Data; import lombok.Getter; /** * @description: 訊息協議: |magic|version|data| * @author: quliang * @create: 2022-12-10 20:46 **/ @Data public class MsgProtocol { @Getter private byte magic=0; @Getter private byte version=1; }訊息協議基類
package protocol.msg.request; import lombok.Data; import protocol.msg.MsgProtocol; /** * @description: * @author: quliang * @create: 2022-12-10 20:58 **/ @Data public class MsgRequest extends MsgProtocol { private String msg; }訊息請求子類
package protocol.msg.response; import lombok.Data; import protocol.msg.MsgProtocol; /** * @description: * @author: quliang * @create: 2022-12-10 20:41 **/ @Data public class MsgResponse extends MsgProtocol { private int statCode; }訊息回應子類
package encoder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import protocol.msg.MsgProtocol; import serializer.KryoSerializer; /** * @description: * @author: quliang * @create: 2022-12-10 20:53 **/ public class MsgEncoder extends MessageToByteEncoder<MsgProtocol> { @Override protected void encode(ChannelHandlerContext ctx, MsgProtocol msgProtocol, ByteBuf in) throws Exception { in.writeByte(msgProtocol.getMagic()); // in.writeByte(msgProtocol.code()); in.writeByte(msgProtocol.getVersion()); byte[] data = https://www.cnblogs.com/quliang/p/new KryoSerializer().serialize(msgProtocol); in.writeShort(data.length); in.writeBytes(data); } }訊息協議編碼
package decoder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import lombok.extern.slf4j.Slf4j; import protocol.msg.MsgProtocol; import serializer.KryoSerializer; import java.util.List; /** * @description: * @author: quliang * @create: 2022-12-10 20:52 **/ @Slf4j public class MsgDecoder extends ByteToMessageDecoder { private Class<MsgProtocol> msgClass; public MsgDecoder(Class clazz) { this.msgClass = clazz; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { try { byte magic = in.readByte(); byte version = in.readByte(); short dataSize = in.readShort(); byte[] data = https://www.cnblogs.com/quliang/p/new byte[dataSize]; in.readBytes(data); MsgProtocol baseProtocol = new KryoSerializer().deserialize(data, msgClass); out.add(baseProtocol); } catch (Exception e) { //如果解碼錯誤,將資料傳遞到下一個解碼器中 log.error("msg decoder {}",e.getMessage()); // 重置讀取位元組索引,因為上邊已經讀了(readBytes),不加這個會導致資料為空 in.resetReaderIndex(); // 這里是復制流,復制一份,防止skipBytes跳過,導致傳遞的訊息變成空; ByteBuf buff = in.retainedDuplicate(); //原因是netty不允許有位元組內容不讀的情況發生,所以采用下邊的方法解決, in.skipBytes(in.readableBytes()); //繼續傳遞到下一個解碼器中 out.add(buff); } } }訊息協議解碼
登錄協議相關
package protocol.system; import lombok.Getter; /** * @description: 登錄協議: |magic|version|code|data| * @author: quliang * @create: 2022-12-09 18:10 **/ public class LoginProtocol { @Getter private byte magic=0; @Getter private byte version=1; @Getter public byte code; }登錄協議基類
package protocol.system.request; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import protocol.system.LoginProtocol; /** * @description: * @author: quliang * @create: 2022-12-06 18:17 **/ @Data @NoArgsConstructor @AllArgsConstructor public class LoginRequest extends LoginProtocol { private String userId; private String userName; }登錄請求子類
package protocol.system.response; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import protocol.system.LoginProtocol; /** * @description: * @author: quliang * @create: 2022-12-06 18:22 **/ @Data @NoArgsConstructor @AllArgsConstructor public class LoginResponse extends LoginProtocol { private String msg; private String data; }登錄回應子類
package encoder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import protocol.system.LoginProtocol; import serializer.KryoSerializer; /** * @description: * @author: quliang * @create: 2022-12-06 22:11 **/ public class LoginEncoder extends MessageToByteEncoder<LoginProtocol> { @Override protected void encode(ChannelHandlerContext ctx, LoginProtocol baseProtocol, ByteBuf in) throws Exception { in.writeByte(baseProtocol.getMagic()); in.writeByte(baseProtocol.getCode()); in.writeByte(baseProtocol.getVersion()); byte[] data = https://www.cnblogs.com/quliang/p/new KryoSerializer().serialize(baseProtocol); in.writeShort(data.length); in.writeBytes(data); } }登錄協議編碼
package decoder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import lombok.extern.slf4j.Slf4j; import protocol.system.LoginProtocol; import serializer.KryoSerializer; import java.util.List; /** * @description: * @author: quliang * @create: 2022-12-06 17:59 **/ @Slf4j public class LoginDecoder extends ByteToMessageDecoder { private Class<LoginProtocol> clazz; public LoginDecoder(Class clazz) { this.clazz = clazz; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { try { byte magic = in.readByte(); byte code = in.readByte(); byte version = in.readByte(); short dataSize = in.readShort(); byte[] data = https://www.cnblogs.com/quliang/p/new byte[dataSize]; in.readBytes(data); LoginProtocol baseProtocol = new KryoSerializer().deserialize(data, clazz); out.add(baseProtocol); } catch (Exception e) { //如果解碼錯誤,將資料傳遞到下一個解碼器中 log.error("login decoder {}", e.getMessage()); // 重置讀取位元組索引,因為上邊已經讀了(readBytes),不加這個會導致資料為空 in.resetReaderIndex(); // 這里是復制流,復制一份,防止skipBytes跳過,導致傳遞的訊息變成空; ByteBuf buff = in.retainedDuplicate(); //原因是netty不允許有位元組內容不讀的情況發生,所以采用下邊的方法解決, in.skipBytes(in.readableBytes()); //繼續傳遞到下一個解碼器中 out.add(buff); } } }登錄協議解碼
這樣公共模塊就創建完成了
創建服務端
package com.ql; import com.ql.handler.MsgHandler; import decoder.LoginDecoder; import decoder.MsgDecoder; import com.ql.handler.LoginHandler; import encoder.LoginEncoder; import encoder.MsgEncoder; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import protocol.system.request.LoginRequest; import protocol.msg.request.MsgRequest; /** * @author quliang * @description 服務端 * @date 2022-12-06 17:39:14 */ @Slf4j public class IotServer { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap().group( bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new LoggingHandler(LogLevel.INFO)); /** * 心跳機制 */ //pipeline.addLast(new IdleStateHandler(5, 10, 5, TimeUnit.SECONDS)); /** * 訊息、登錄解碼器 */ pipeline.addLast(new LoginDecoder(LoginRequest.class)); pipeline.addLast(new MsgDecoder(MsgRequest.class)); /** * 訊息、登錄處理器 */ pipeline.addLast(new MsgHandler()); pipeline.addLast(new LoginHandler()); /** * 訊息、登錄編碼器 */ pipeline.addLast(new MsgEncoder()); pipeline.addLast(new LoginEncoder()); } }) .option(ChannelOption.SO_BACKLOG, 1024); ChannelFuture cf = bootstrap.bind(8849).sync(); log.info("socket服務端啟動成功 {}", cf.channel().localAddress().toString()); cf.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }服務端代碼
package com.ql.handler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; import protocol.msg.request.MsgRequest; import protocol.msg.response.MsgResponse; /** * @description: 訊息處理器 * @author: quliang * @create: 2022-12-10 20:57 **/ @Slf4j @ChannelHandler.Sharable public class MsgHandler extends SimpleChannelInboundHandler<MsgRequest> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("上線{}", ctx.channel().remoteAddress().toString()); } @Override protected void channelRead0(ChannelHandlerContext ctx, MsgRequest request) throws Exception { log.info("服務端讀取訊息體資料為{}", request.toString()); MsgResponse response = new MsgResponse(); response.setStatCode(200); ctx.channel().writeAndFlush(response); } }服務端訊息處理器
package com.ql.handler; import io.netty.channel.*; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; import protocol.system.request.LoginRequest; import protocol.system.response.LoginResponse; import java.util.concurrent.atomic.AtomicInteger; /** * @description: 登錄處理器 * @author: quliang * @create: 2022-12-06 18:14 **/ @Slf4j @ChannelHandler.Sharable public class LoginHandler extends SimpleChannelInboundHandler<LoginRequest>{ private static AtomicInteger READER_COUNT = new AtomicInteger(0); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("服務端:{} 通道開啟!", ctx.channel().localAddress().toString()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("服務端: {} 通道關閉!", ctx.channel().localAddress().toString()); } @Override protected void channelRead0(ChannelHandlerContext ctx, LoginRequest loginRequest) throws Exception { log.info("讀取資料 {} ", loginRequest.toString()); LoginResponse response= new LoginResponse("success", null); ctx.channel().writeAndFlush(response); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("...............資料接收-完畢..............."); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); log.error("...............業務處理例外...............{}", cause); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; Channel channel = ctx.channel(); switch (event.state()) { case READER_IDLE: log.info("讀空閑"); READER_COUNT.addAndGet(1); break; case WRITER_IDLE: log.info("寫空閑"); break; default: break; } ctx.disconnect(); if (READER_COUNT.get() > 3) { log.info("close this channel {}", channel.remoteAddress().toString()); } } } }服務端登錄處理器
服務端其實很多都是直接參考公共模塊的,代碼也并不復雜
創建訊息客戶端
package com.ql; import com.ql.handler.ClientMsgHandler; import decoder.MsgDecoder; import encoder.MsgEncoder; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import protocol.msg.response.MsgResponse; import java.net.InetSocketAddress; /** * @author quliang * @description 客戶端 * @date 2022-12-06 17:37:56 */ @Slf4j public class IotClientMsg { public static void main(String[] args) throws InterruptedException { EventLoopGroup clientGroup = new NioEventLoopGroup(); try { Bootstrap bs = new Bootstrap(); bs.group(clientGroup) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress("169.254.190.154", 8849)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LoggingHandler(LogLevel.INFO)); //訊息解碼器 pipeline.addLast(new MsgDecoder(MsgResponse.class)); //客戶端訊息處理器 pipeline.addLast(new ClientMsgHandler()); //訊息編碼器 pipeline.addLast(new MsgEncoder()); } }); ChannelFuture cf = bs.connect().sync(); log.info("啟動成功{}", cf.channel().localAddress().toString()); // Scanner scanner = new Scanner(System.in); cf.channel().closeFuture().sync(); } finally { clientGroup.shutdownGracefully().sync(); } } }客戶端代碼
package com.ql.handler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; import protocol.msg.request.MsgRequest; import protocol.msg.response.MsgResponse; /** * @description: * @author: quliang * @create: 2022-12-10 20:40 **/ @Slf4j @ChannelHandler.Sharable public class ClientMsgHandler extends SimpleChannelInboundHandler<MsgResponse> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { MsgRequest request = new MsgRequest(); request.setMsg("hello"); ctx.channel().writeAndFlush(request); } @Override protected void channelRead0(ChannelHandlerContext ctx, MsgResponse response) throws Exception { int code = response.getStatCode(); log.info("訊息處理器讀取回應物件資料為{}", code); } }客戶端訊息處理器
訊息客戶端代碼也并不復雜
創建登錄客戶端
package com.ql; import com.ql.handler.ClientLoginHandler; import decoder.LoginDecoder; import encoder.LoginEncoder; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import protocol.system.response.LoginResponse; import java.net.InetSocketAddress; /** * @author quliang * @description 客戶端 * @date 2022-12-06 17:37:56 */ @Slf4j public class IotClientLogin { public static void main(String[] args) throws InterruptedException { EventLoopGroup clientGroup = new NioEventLoopGroup(); try { Bootstrap bs = new Bootstrap(); bs.group(clientGroup) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress("169.254.190.154", 8849)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LoggingHandler(LogLevel.INFO)); pipeline.addLast(new LoginDecoder(LoginResponse.class)); //pipeline.addLast(new MsgDecoder(MsgResponse.class)); //pipeline.addLast(new ClientMsgHandler()); pipeline.addLast(new ClientLoginHandler()); //pipeline.addLast(new MsgEncoder()); pipeline.addLast(new LoginEncoder()); } }); ChannelFuture cf = bs.connect().sync(); log.info("啟動成功{}", cf.channel().localAddress().toString()); // Scanner scanner = new Scanner(System.in); cf.channel().closeFuture().sync(); } finally { clientGroup.shutdownGracefully().sync(); } } }客戶端代碼
package com.ql.handler; import io.netty.channel.*; import lombok.extern.slf4j.Slf4j; import protocol.system.request.LoginRequest; import protocol.system.response.LoginResponse; import java.util.Scanner; /** * @description: * @author: quliang * @create: 2022-12-06 22:16 **/ @Slf4j @ChannelHandler.Sharable public class ClientLoginHandler extends SimpleChannelInboundHandler<LoginResponse> { private Scanner scanner = new Scanner(System.in); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("客戶端:{} 通道開啟!", ctx.channel().localAddress().toString()); login(ctx); } /** * 登錄方法 * @param ctx */ private void login(ChannelHandlerContext ctx) { LoginRequest request = new LoginRequest("123", "123"); ctx.channel().writeAndFlush(request); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("客戶端: {} 讀取資料 {}", ctx.channel().localAddress().toString(), msg.toString()); } @Override protected void channelRead0(ChannelHandlerContext ctx, LoginResponse response) throws Exception { log.info("客戶端: {} 讀取資料 {}", ctx.channel().localAddress().toString(), response.toString()); String msg = response.getMsg(); log.info("========{}", msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("...............資料接收-完畢..............."); } }客戶端登錄處理器

我們是怎么通過這個專案來實作不同協議編解碼?
其實也不難,我們仔細看MsgDecoder、LoginDecoder兩個類其中一個類的代碼,其中有個巧妙的操作就是使用try-catch,
只要解碼器無法解碼發生例外,就重置讀取位元組索引傳遞到下一個解碼器中,直到傳遞到正確解碼器中,不過為了兼容多種協議,
解碼例外也會讓服務端性能有所下降的,取舍之間必有得失,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/542997.html
標籤:Java
上一篇:java基礎:java基礎語法
下一篇:java基礎:陣列
