netty是基于NIO(同步非阻塞)開發的網路通信框架;對比傳統BIO(阻塞IO),其并發性能有很大提升,而dubbo的底層就是使用netty作為網路框架,本文就手寫簡單的基于netty的RPC框架,
1 設計步驟
定義一個通用介面,作為服務提供者(provider)和消費者(consumer)之間的操作紐帶
創建一個服務提供者,實作通用介面,并回傳處理結果;網路方面監聽消費者請求
創建一個服務消費者,通過代理模式呼叫遠程服務介面
1.1 程式目錄

1.2 定義一個通用介面
public interface TestService {
String hello(String msg);
}
2 服務提供者模塊
2.1 介面實作
public class TestServiceImpl implements TestService {
@Override
public String hello(String msg) {
System.out.println("TestServiceImpl中hello被呼叫,引數:" + msg);
return "你好客戶端,我已經收到你的訊息:" + msg;
}
}
2.2 定義一個服務啟動類
public class ServerBootStrap {
public static void main(String[] args) {
NettyServer nettyServer = new NettyServer(40004);
nettyServer.init();
}
}
2.3 創建netty服務端
**此步驟是netty常規服務端創建方式**
public class NettyServer {
private int port;
public NettyServer(int port) {
this.port = port;
}
public void init() {
//創建一個用于接收連接的執行緒組,引數代表執行緒個數
EventLoopGroup boss = new NioEventLoopGroup(1);
//創建處理操作時間的執行緒組,沒有引數netty會默認執行緒為內核數*2
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture ch = serverBootstrap.bind(port).sync();
ch.channel().closeFuture().sync();
} catch (Exception ex) {
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
2.4 服務端業務處理Handler
channelRead0方法用于接收客戶端傳來的資訊,同時對資料進行校驗
校驗成功后,截取有效引數呼叫服務介面
public class NettyServerHandler extends SimpleChannelInboundHandler {
private static String head = "dubbo#TestServie#";
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到客戶端資料:" + msg);
if (msg.toString().startsWith(head)) {
TestService testService = new TestServiceImpl();
String result = testService.hello(msg.toString().substring(head.length()));
ctx.writeAndFlush(result);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("接收到連接請,channelActive被呼叫:" + ctx.channel().remoteAddress());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("讀取完成");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("斷開連接");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("拋出例外");
ctx.channel().close();
}
}
3 消費者模塊
3.1 創建消費者啟動程式
public class ClientBootStrap {
public static void main(String[] args) {
NettyClient nettyClient = new NettyClient("127.0.0.1", 40004);
String head = "dubbo#TestServie#";
// nettyClient.init();
TestService service = (TestService) nettyClient.getBean(TestService.class, head);
String result = service.hello("你好,我是服務消費者");
System.out.println("呼叫回傳了結果:" + result);
}
}
3.2 創建消費者網路通信模塊
通過代理模式呼叫
public class NettyClient {
private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static NettyClientHandler nettyClientHandler;
private String host;
private int port;
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
}
//撰寫一個代理 請求服務提供者介面
public Object getBean(final Class<?> serviceClass, final String providerName) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[]{serviceClass}, ((proxy, method, args) -> {
System.out.println("開始執行代理");
if (nettyClientHandler == null)
init();
System.out.println("設定代理引數");
nettyClientHandler.setPara(providerName + args[0].toString());
return executorService.submit(nettyClientHandler).get();
}));
}
private static void init() {
System.out.println("開始執行init方法");
nettyClientHandler = new NettyClientHandler();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(nettyClientHandler);
}
});
bootstrap.connect("127.0.0.1", 40004).sync();
// future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// worker.shutdownGracefully();
// System.out.println("執行結束");
}
}
}
3.3 創建消費者業務處理handler
成員變數para: 為呼叫遠程介面服務的引數
成員變數result::為呼叫遠程服務器介面回傳結果
需要注意的是該handller實作了Callable介面中call()方法;
執行步驟為:
1、連接建立成功后執行channelActive方法
2、執行call方法發送資料到服務端,同時阻塞執行緒
3、服務端回傳結果后執行channelRead0方法,喚醒執行緒,
4、執行call方法中wait()后面的步驟,回傳結果
public class NettyClientHandler extends SimpleChannelInboundHandler implements Callable {
private ChannelHandlerContext context;
private String para;
private String result;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.context = ctx;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("斷開連接");
}
@Override
public synchronized Object call() throws Exception {
System.out.println("發送call訊息:" + para);
context.writeAndFlush(para);
wait();
return result;
}
@Override
protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
result = o.toString();
System.out.println("收到服務端的回傳訊息:" + o);
notify();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("客戶端發生例外");
}
void setPara(String str) {
this.para = str;
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/398679.html
標籤:java
