問題描述
最近需要用netty實作一個中間件通信,開始為了先快速把客戶端和服務端通信的demo完成,只是采用了字串的編解碼方式(StringEncoder,StringDecoder),客戶端和服務端可以正常互發資料,一切運行正常,
但是字串的編解碼并不適合業務物體類的傳輸,為了快速實作物體類傳輸,所以決定采用jboss-marshalling-serial序列化方式先完成demo,但是在客戶端發送資料時,服務端卻無法收到資料,客戶端控制臺也沒有任何例外資訊,
先看整個demo實作代碼,再查找問題原因,(先提前說明,示例代碼是完全正確無邏輯bug的)
pom依賴
<dependencies>
<!--只是用到了里面的日志框架-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.56.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>2.0.10.Final</version>
</dependency>
</dependencies>
jboss-marshalling-serial序列化工具類
- netty提供的Marshalling編解碼器采用訊息頭和訊息體的方式
- JBoss Marshalling是一個Java物件序列化包,對jdk默認的序列化框架進行優化,但又保持跟Serializable介面的兼容,同時增加了一些可呼叫的引數和附加的特性
- 經過測驗發現序列化后的流較protostuff,MessagePack還是比較大的,
- 序列化和反序列化的類必須是同一個類,否則拋出例外: io.netty.handler.codec.DecoderException: java.lang.ClassNotFoundException: com.bruce.netty.rpc.entity.UserInfo
public final class MarshallingCodeFactory {
private static final InternalLogger log = InternalLoggerFactory.getInstance(MarshallingCodeFactory.class);
/** 創建Jboss marshalling 解碼器 */
public static MyMarshallingDecoder buildMarshallingDecoder() {
//引數serial表示創建的是Java序列化工廠物件,由jboss-marshalling-serial提供
MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
DefaultUnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, configuration);
return new MyMarshallingDecoder(provider, 1024);
}
/** 創建Jboss marshalling 編碼器 */
public static MarshallingEncoder buildMarshallingEncoder() {
MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
DefaultMarshallerProvider provider = new DefaultMarshallerProvider(factory, configuration);
return new MarshallingEncoder(provider);
}
public static class MyMarshallingDecoder extends MarshallingDecoder {
public MyMarshallingDecoder(UnmarshallerProvider provider, int maxObjectSize) {
super(provider, maxObjectSize);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
log.info("讀取資料長度:{}", in.readableBytes());
return super.decode(ctx, in);
}
}
}
服務端代碼實作
服務端業務處理器:(真實場景中不要在io執行緒執行耗時業務邏輯處理)
@ChannelHandler.Sharable
public class SimpleServerHandler extends ChannelInboundHandlerAdapter {
private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleServerHandler.class);
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("handlerAdded" + this.hashCode());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("server channelRead:{}", msg);
ctx.channel().writeAndFlush("hello netty");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof java.io.IOException) {
log.warn("client close");
} else {
cause.printStackTrace();
}
}
}
服務端啟動類
public class NettyServer {
private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyServer.class);
public static void main(String[] args) throws Exception {
EventLoopGroup acceptGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
Class<? extends ServerSocketChannel> serverSocketChannelClass = NioServerSocketChannel.class;
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(acceptGroup, workerGroup)
.channel(serverSocketChannelClass)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, false) //默認為false
.handler(new LoggingHandler())
.childHandler(new CustomCodecChannelInitializer());
try {
//sync() 將異步變為同步,系結到8088埠
ChannelFuture channelFuture = bootstrap.bind(8088).sync();
log.info("server 啟動成功");
} catch (Exception e) {
e.printStackTrace();
}
Thread serverShutdown = new Thread(() -> {
log.info("執行jvm ShutdownHook, server shutdown");
acceptGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
});
//注冊jvm ShutdownHook,jvm退出之前關閉服務資源
Runtime.getRuntime().addShutdownHook(serverShutdown);
}
static class CustomCodecChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());
pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());
pipeline.addLast(new SimpleServerHandler());
}
}
}
客戶端代碼實作
客戶端業務處理器
public class SimpleClientHandler extends ChannelInboundHandlerAdapter {
private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("client receive:{}", msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
}
客戶端啟動類
public class NettyClient {
private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);
public static void main(String[] args) {
EventLoopGroup workerGroup = new NioEventLoopGroup();
Class<? extends SocketChannel> socketChannelClass = NioSocketChannel.class;
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(socketChannelClass)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000)
.handler(new CustomCodecChannelInitializer());
Channel clientChannel;
try {
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8088);
//同步等待連接建立成功, 這里示例代碼, 可以認為是一定會連接成功
boolean b = channelFuture.awaitUninterruptibly(10, TimeUnit.SECONDS);
clientChannel = channelFuture.channel();
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
UserInfo userInfo = new UserInfo("bruce", 18);
log.info("send user info");
//連接成功后發送資料
send(clientChannel, userInfo);
}
//實際上這個地方會永遠阻塞等待
clientChannel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}
static void send(Channel channel, UserInfo data) {
//連接成功后發送資料
ChannelFuture channelFuture1 = channel.writeAndFlush(data);
}
static class CustomCodecChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());
pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());
pipeline.addLast(new SimpleClientHandler());
}
}
}
物體類UserInfo,
public class UserInfo {
private String username;
private int age;
public UserInfo() {
}
public UserInfo(String username, int age) {
this.username = username;
this.age = age;
}
//getter / setter 省略
}
先啟動服務端,再啟動客戶端可以在idea控制臺發現:
服務端和客戶端建立了連接,客戶端在發送資料,
但
是
服
務
端
卻
沒
有
收
到
,
并
且
控
制
臺
沒
有
任
何
異
常
信
息
\color{#FF3030}{但是服務端卻沒有收到,并且控制臺沒有任何例外資訊}
但是服務端卻沒有收到,并且控制臺沒有任何異常信息


既然沒有例外,只能先在客戶端斷點,確認客戶端是否正常,根據經驗直接查看MarshallingEncoder的編碼方法MarshallingEncoder#encode,debug執行先確認UserInfo物件有沒有被正確序列化,

在執行到marshaller.writeObject(msg)時出現了例外,

繼續跟進斷點會進入catch中,顯示java.io.NotSerializableException,(腦中出現一句話:我大意了,沒有…)已經可以知道UserInfo類沒有繼承序列化介面java.io.Serializable而拋出例外,UserInfo只需要繼承java.io.Serializable就可以正常向客戶端發送資料,
但是為什么控制臺沒有拋出例外呢 !?
繼續跟進斷點NotSerializableException被包裝在io.netty.handler.codec.EncoderException中拋出,序列化的buf也在finally中被釋放,而EncoderException會被AbstractChannelHandlerContext#invokeWrite0方法的catch陳述句中被處理,
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
// Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
// false.
PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
}
最侄訓執行到io.netty.util.concurrent.DefaultPromise#setValue0,主要目的就是為了記錄這個例外資訊,然后檢查是否有GenericFutureListener監聽這次發送請求的結果,如果有Listener則在nio執行緒中回呼監聽器方法,
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
if (checkNotifyWaiters()) {
notifyListeners();
}
return true;
}
return false;
}
private synchronized boolean checkNotifyWaiters() {
if (waiters > 0) {
notifyAll();
}
return listeners != null;
}
然而筆者的示例中并沒有設定GenericFutureListener,checkNotifyWaiters方法回傳的是false,不會執行notifyListeners();方法,所以整個例外被吞沒,而Promise#tryFailure方法最侄訓傳true,
再看方法io.netty.util.internal.PromiseNotificationUtil#tryFailure,雖然也是會處理Throwable,但是只在Promise#tryFailure回傳false并且logger不為null時執行,所以這里也不會列印出日志,
public static void tryFailure(Promise<?> p, Throwable cause, InternalLogger logger) {
if (!p.tryFailure(cause) && logger != null) {
Throwable err = p.cause();
if (err == null) {
logger.warn("Failed to mark a promise as failure because it has succeeded already: {}", p, cause);
} else if (logger.isWarnEnabled()) {
logger.warn(
"Failed to mark a promise as failure because it has failed already: {}, unnotified cause: {}",
p, ThrowableUtil.stackTraceToString(err), cause);
}
}
}
如何列印出這些例外資訊呢?
方案1 (異步處理)
在資料發送過后,給ChannelFuture添加監聽器,用于監聽此次發送的結果,當出現例外時,對例外進行處理,
static void send(Channel channel, UserInfo data) {
//連接成功后發送資料
ChannelFuture channelFuture1 = channel.writeAndFlush(data);
channelFuture1.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
cause.printStackTrace();
}
}
});
}
方案2 (不推薦,根據業務決定)
在資料發送過后,同步等待發送結果,判斷是否存在例外,
static void send(Channel channel, UserInfo data) {
//連接成功后發送資料
ChannelFuture channelFuture1 = channel.writeAndFlush(data);
while (!channelFuture1.isDone()) {
try {
//超時時間示例值,根據業務決定
boolean notTimeout = channelFuture1.await(50);
} catch (Exception e) {
log.warn(e.getMessage());
}
}
Throwable cause = channelFuture1.cause();
if (cause != null) {
cause.printStackTrace();
}
}
沒有監聽ChannelFuture,例外就被隱藏是否合理呢?
這個問題見仁見智,對筆者有點代碼潔癖來說,這里至少是可有優化一下的,不至于讓開發者耗費時間去查找丟失的例外資訊,優化邏輯也簡單,在io.netty.util.concurrent.DefaultPromise#setFailure0中,如果既沒有listeners也沒有await等待時,則列印例外資訊,
修DefaultPromise改代碼如下:
private boolean setFailure0(Throwable cause) {
if (listeners == null && waiters == 0) {
logger.error("cause:", cause);
}
return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}
請看pr: https://github.com/netty/netty/pull/10917
總結
這里只是通過編碼時沒有注意到的細節(物體類沒有實作序列化介面),來分析為什么例外被吞及處理方案,可以通過例外堆疊快速定位問題,但如果想要沒有例外,則只能根據例外做相應的修改了,同時可以讓我們更加了解netty的實作細節,
最后還是建議通過channel發送資料后,對回傳的ChannelFuture做是否存在例外判斷以及處理,防止出現類似的情況,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/247195.html
標籤:java
