Netty是Trustin Lee在2004年開發的一款高性能的網路應用程式框架,相比于JDK自帶的NIO,Netty做了相當多的增強,且隔離了jdk nio的實作細節,API也比較友好,還支持流量整形等高級特性,在我們常見的一些開源專案中已經普遍的應用到了Netty,比如Dubbo、Elasticsearch、Zookeeper等,
Netty的具體開發
提示:因代碼相對較多,這里只展示其主要部分,至于專案中用到的編解碼器、工具類,請直接拉到最后下載原始碼!也歡迎順手給個Star~
需要的依賴
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>4.1.1</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jmx</artifactId>
<version>4.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.29.Final</version>
</dependency>
Client端代碼
package com.example.nettydemo.client;
import com.example.nettydemo.client.codec.*;
import com.example.nettydemo.client.codec.dispatcher.OperationResultFuture;
import com.example.nettydemo.client.codec.dispatcher.RequestPendingCenter;
import com.example.nettydemo.client.codec.dispatcher.ResponseDispatcherHandler;
import com.example.nettydemo.common.RequestMessage;
import com.example.nettydemo.common.string.StringOperation;
import com.example.nettydemo.util.IdUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import javax.net.ssl.SSLException;
import java.util.concurrent.ExecutionException;
public class Client {
public static void main(String[] args) throws InterruptedException, ExecutionException, SSLException {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
//客戶端連接服務器最大允許時間,默認為30s
bootstrap.option(NioChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000); //10s
NioEventLoopGroup group = new NioEventLoopGroup();
try {
bootstrap.group(group);
RequestPendingCenter requestPendingCenter = new RequestPendingCenter();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.INFO);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new FrameDecoder());
pipeline.addLast(new FrameEncoder());
pipeline.addLast(new ProtocolEncoder());
pipeline.addLast(new ProtocolDecoder());
pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));
pipeline.addLast(new OperationToRequestMessageEncoder());
// pipeline.addLast(loggingHandler);
}
});
//連接服務
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888);
//因為future是異步執行,所以需要先連接上后,再進行下一步操作
channelFuture.sync();
long streamId = IdUtil.nextId();
/**
* 發送資料測驗,按照定義的規則組裝資料
*/
// OrderOperation orderOperation = new OrderOperation(1001, "你好啊,hi");
RequestMessage requestMessage = new RequestMessage(streamId, new StringOperation(1234, "你好啊,hi"));
//將future放入center
OperationResultFuture operationResultFuture = new OperationResultFuture();
requestPendingCenter.add(streamId, operationResultFuture);
//發送訊息
for (int i = 0; i < 10; i++) {
channelFuture.channel().writeAndFlush(requestMessage);
}
//阻塞等待結果,結果來了之后會呼叫ResponseDispatcherHandler去set結果
// OperationResult operationResult = operationResultFuture.get();
// //將結果列印
// System.out.println("回傳:"+operationResult);
channelFuture.channel().closeFuture().get();
} finally {
group.shutdownGracefully();
}
}
}
Server端代碼
package com.example.nettydemo.server;
import com.example.nettydemo.server.codec.FrameDecoder;
import com.example.nettydemo.server.codec.FrameEncoder;
import com.example.nettydemo.server.codec.ProtocolDecoder;
import com.example.nettydemo.server.codec.ProtocolEncoder;
import com.example.nettydemo.server.handler.MetricsHandler;
import com.example.nettydemo.server.handler.ServerIdleCheckHandler;
import com.example.nettydemo.server.handler.ServerProcessHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor;
import lombok.extern.slf4j.Slf4j;
import javax.net.ssl.SSLException;
import java.security.cert.CertificateException;
import java.util.concurrent.ExecutionException;
/**
* netty server 入口
*/
@Slf4j
public class Server {
public static void main(String... args) throws InterruptedException, ExecutionException, CertificateException, SSLException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
//設定channel模式,因為是server所以使用NioServerSocketChannel
serverBootstrap.channel(NioServerSocketChannel.class);
//最大的等待連接數量
serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
//設定是否啟用 Nagle 演算法:用將小的碎片資料連接成更大的報文 來提高發送效率,
//如果需要發送一些較小的報文,則需要禁用該演算法
serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);
//設定netty自帶的log,并設定級別
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
//thread
//用戶指定執行緒名
NioEventLoopGroup bossGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
NioEventLoopGroup workGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
UnorderedThreadPoolEventExecutor businessGroup = new UnorderedThreadPoolEventExecutor(10, new DefaultThreadFactory("business"));
//只能使用一個執行緒,因GlobalTrafficShapingHandler比較輕量級
NioEventLoopGroup eventLoopGroupForTrafficShaping = new NioEventLoopGroup(0, new DefaultThreadFactory("TS"));
try {
//設定react方式
serverBootstrap.group(bossGroup, workGroup);
//metrics
MetricsHandler metricsHandler = new MetricsHandler();
//trafficShaping流量整形
//long writeLimit 寫入時控制, long readLimit 讀取時控制 具體設定看業務修改
GlobalTrafficShapingHandler globalTrafficShapingHandler = new GlobalTrafficShapingHandler(eventLoopGroupForTrafficShaping, 10 * 1024 * 1024, 10 * 1024 * 1024);
//log
LoggingHandler debugLogHandler = new LoggingHandler(LogLevel.DEBUG);
LoggingHandler infoLogHandler = new LoggingHandler(LogLevel.INFO);
//設定childHandler,按執行順序放
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("debugLog", debugLogHandler);
pipeline.addLast("tsHandler", globalTrafficShapingHandler);
pipeline.addLast("metricHandler", metricsHandler);
pipeline.addLast("idleHandler", new ServerIdleCheckHandler());
pipeline.addLast("frameDecoder", new FrameDecoder());
pipeline.addLast("frameEncoder", new FrameEncoder());
pipeline.addLast("protocolDecoder", new ProtocolDecoder());
pipeline.addLast("protocolEncoder", new ProtocolEncoder());
pipeline.addLast("infoLog", infoLogHandler);
//對flush增強,減少flush次數犧牲延遲增強吞吐量
pipeline.addLast("flushEnhance", new FlushConsolidationHandler(10, true));
//為業務處理指定單獨的執行緒池
pipeline.addLast(businessGroup, new ServerProcessHandler());//businessGroup,
}
});
//系結埠并阻塞啟動
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
businessGroup.shutdownGracefully();
eventLoopGroupForTrafficShaping.shutdownGracefully();
}
}
}
最后
以上介紹了Netty的基本用法,在代碼中也做了一部分的關鍵注釋,但可能還會有許多不足,也不可能滿足所有人的要求,大家可根據自己的實際需求去改造此專案,附上原始碼地址netty原始碼
持續學習,記錄點滴,更多文章請訪問 文章首發
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/5991.html
標籤:架構設計
上一篇:springcloud 專案原始碼 微服務 分布式 Activiti6 作業流 vue.js html 跨域 前后分離
下一篇:聊聊應用系統架構的0到1
