前言
本周繼續學習尼恩編著的《Netty、Redis、ZooKeeper高并發實戰》,一些資源也貼在這里,自己以后想看還可以找到,這個是在博客園的一個入口https://www.cnblogs.com/crazymakercircle/p/9904544.html,
這周主要學習了Netty客戶端和服務端通信,書是由淺入深的在進行,從Socket NOI通信到 Reactor反應器模式,再到Netty框架,示例代碼都在https://gitee.com/crazymaker/netty_redis_zookeeper_source_code.git 中可以看到,書結合源代碼,自己在動手試驗一下,感徑訓是有些識訓, 今天的示例代碼就是實踐出一個客戶端和服務端傳遞protobuf的例子,
Netty 服務端
先來看一下服務端代碼,
@Slf4j
public class ProtoBufServer {
private final int serverPort;
ServerBootstrap b = new ServerBootstrap();
public ProtoBufServer(int port) {
this.serverPort = port;
}
public void runServer() {
//創建reactor 執行緒組
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
//1 設定reactor 執行緒組
b.group(bossLoopGroup, workerLoopGroup);
//2 設定nio型別的channel
b.channel(NioServerSocketChannel.class);
//3 設定監聽埠
b.localAddress(serverPort);
//4 設定通道的引數
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//5 裝配子通道流水線
b.childHandler(new ChannelInitializer<SocketChannel>() {
//有連接到達時會創建一個channel
protected void initChannel(SocketChannel ch) throws Exception {
// pipeline管理子通道channel中的Handler
// 向子channel流水線添加3個handler處理器
// protobufDecoder僅僅負責編碼,并不支持讀半包,所以在之前,一定要有讀半包的處理器,
// 有三種方式可以選擇:
// 使用netty提供ProtobufVarint32FrameDecoder
// 繼承netty提供的通用半包處理器 LengthFieldBasedFrameDecoder
// 繼承ByteToMessageDecoder類,自己處理半包
// 半包的處理
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
// 需要解碼的目標類
ch.pipeline().addLast(new ProtobufDecoder(MarketPriceProto.MarketPrice.getDefaultInstance()));
ch.pipeline().addLast(new ProtobufBussinessHandler());
}
});
// 6 開始系結server
// 通過呼叫sync同步方法阻塞直到系結成功
ChannelFuture channelFuture = b.bind().sync();
log.info(" 服務器啟動成功,監聽埠: " +
channelFuture.channel().localAddress());
// 7 等待通道關閉的異步任務結束
// 服務監聽通道會一直等待通道關閉的異步任務結束
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 8 優雅關閉EventLoopGroup,
// 釋放掉所有資源包括創建的執行緒
workerLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();
}
}
//服務器端業務處理器
static class ProtobufBussinessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MarketPriceProto.MarketPrice protoMsg = (MarketPriceProto.MarketPrice) msg;
//經過pipeline的各個decoder,到此Person型別已經可以斷定
log.info("收到一個 MsgProtos.Msg 資料包 =》");
log.info("protoMsg.getId():=" + protoMsg.getId());
log.info("protoMsg.getClose():=" + protoMsg.getClose());
}
}
public static void main(String[] args) throws InterruptedException {
int port = SERVER_PORT;
new ProtoBufServer(port).runServer();
}
}
代碼中有注釋解釋,我在這里加一下說明
代碼中有兩個EventLoopGroup bossLoopGroup和EventLoopGroup workerLoopGroup,使用兩個是什么原因呢? 一個是負責處理連接監聽事件, 一個負責處理資料IO事件和Handler業務處理,通俗點解釋就是一個負責接客,一個負責服務客戶,如果只有一個人就會忙不過來,讓后面的人等很久,
b.childHandler這個就是我們具體的如何處理接收到的訊息,他們都繼承ChannelInboundHandlerAdapter,通過PipeLine把訊息進行處理,我們從通道里面拿到的都是位元組碼,那么要轉成我們需要的Protobuf類,就需要用到這些處理類
前兩個處理類ProtobufVarint32FrameDecoder和ProtobufDecoder都是Netty提供的,一個是為了解決半包問題,半包問題是因為在TCP傳輸的時候對資料包進行了拆包或者分包,收到的時候如果直接處理,就會有問題,需要我們在應用層進行二次封裝, 在這個示例中如果我們不使用ProtobufVarint32FrameDecoder,客戶端也不用,那么就會出現有的可以決議出來,有的報錯的情況: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
可以用搬家來打比分,我們把一個家從一個地方搬到另外一個地方,還要求布局一樣,汽車運輸的時候可能要分好幾次,那么我們可以先記住位置,然后隨便裝車搬過去,過去后先暫存,再按記錄的位置進行還原,這樣來保證一模一樣,
ProtobufBussinessHandler 是我們自定義的Handler繼承了ChannelInboundHandlerAdapter,通過它我們拿到channel里面的資料,轉換成我們的具體的Protobuf物件,這里只是簡單的列印出來, 如果要把它繼續傳下去,需要呼叫 super.channelRead(ctx,msg)傳遞下去,
客戶端
@Slf4j
public class ProtoBufScanClient {
private int serverPort;
private String serverIp;
Bootstrap b = new Bootstrap();
public ProtoBufScanClient(String ip, int port) {
this.serverPort = port;
this.serverIp = ip;
}
public void runClient() {
//創建reactor 執行緒組
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
//1 設定reactor 執行緒組
b.group(workerLoopGroup);
//2 設定nio型別的channel
b.channel(NioSocketChannel.class);
//3 設定監聽埠
b.remoteAddress(serverIp, serverPort);
//4 設定通道的引數
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//5 裝配通道流水線
b.handler(new ChannelInitializer<SocketChannel>() {
//初始化客戶端channel
protected void initChannel(SocketChannel ch) throws Exception {
// 客戶端channel流水線添加2個handler處理器
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
}
});
ChannelFuture f = b.connect();
f.addListener((ChannelFuture futureListener) ->
{
if (futureListener.isSuccess()) {
log.info("EchoClient客戶端連接成功!");
} else {
log.info("EchoClient客戶端連接失敗!");
}
});
// 阻塞,直到連接完成
f.sync();
Channel channel = f.channel();
Scanner scanner = new Scanner(System.in);
log.info("請輸入發送內容:");
GenericFutureListener sendCallBack = new GenericFutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess()) {
log.info("發送成功!");
} else {
log.info("發送失敗!");
}
}
};
while (scanner.hasNext()) {
//獲取輸入的內容
String next = scanner.next();
String[] values = next.split(",");
if(values.length != 5)
{
log.info("格式不正確!");
}
else {
MarketPriceProto.MarketPrice msg = build(values);
ChannelFuture writeAndFlushFuture = channel.writeAndFlush(msg);
writeAndFlushFuture.addListener(sendCallBack);
}
log.info("請輸入發送內容:");
}
channel.flush();
// 7 等待通道關閉的異步任務結束
// 服務監聽通道會一直等待通道關閉的異步任務結束
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 優雅關閉EventLoopGroup,
// 釋放掉所有資源包括創建的執行緒
workerLoopGroup.shutdownGracefully();
}
}
//構建ProtoBuf物件
public MarketPriceProto.MarketPrice build(String[] values) {
MarketPriceProto.MarketPrice.Builder builder = MarketPriceProto.MarketPrice.newBuilder();
builder.setId(values[0]);
builder.setOpen(Double.valueOf(values[1]));
builder.setHigh(Double.valueOf(values[2]));
builder.setLow(Double.valueOf(values[3]));
builder.setClose(Double.valueOf(values[4]));
return builder.build();
}
public static void main(String[] args) throws InterruptedException {
int port = SERVER_PORT;
String ip = SOCKET_SERVER_IP;
new ProtoBufScanClient(ip, port).runClient();
}
}
Netty客戶端代碼和服務端很接近,這里它只用了一個執行緒組,客戶端只有它一個使用,和服務端模式不一樣,一個就夠了,
客戶端讀取控制臺輸入資料,然后構造成MarketPriceProto.MarketPrice,它的Proto定義如下
// [開始宣告]
syntax = "proto3";
//定義protobuf的包名稱空間
package com.ken.netty.protocol;
// [結束宣告]
// [開始 java 選項配置]
option java_package = "com.ken.netty.protocol";
option java_outer_classname = "MarketPriceProto";
// [結束 java 選項配置]
// [開始 訊息定義]
message MarketPrice {
string id = 1;
double open = 2;
double high = 3;
double low = 4;
double close = 5;
}
channel.writeAndFlush(msg), 這里我們直接把Protobuf物件傳遞進channel, ProtobufEncoder會對它進行編碼,
總結
代碼比較簡單,這是在有書的幫助和源代碼幫助下實作的,不通過書這個也不是這么容易理解的, 作者還實作了一個及時通信的例子https://gitee.com/crazymaker/SimpleCrayIM, 也需要花些時間來學習一下,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/548203.html
標籤:其他
上一篇:計算機專業規劃
下一篇:從pcap檔案中提取pcma音頻
