第1章 訪問快取服務器程序(hget)
架構在jedis外層封裝了一個客戶端ClusterNativeClient,在這個類中,提供了很多訪問redis的方法,包括:hget、hset、hdel等,即成為前臺業務代碼和jedis進行互動的橋梁。
我們以查詢操作為例,詳細描述一下jedis的實作程序。
1.1 原始碼分析
1.1.1 ClusterNativeClient中hget方法:
public String hget(String key, String field)
{
return pool.hget(key, field);
}
說明:
Pool是快取初始化時建立的,我們后面詳細講。
public static void init(Properties props)
{
String[] servers = props.getProperty("host").split(",");
HashSet nodes = new HashSet();
for (String s : servers) {
String[] ipAndPort = s.split(":");
String ip = ipAndPort[0];
int port = Integer.valueOf(ipAndPort[1]).intValue();
nodes.add(new HostAndPort(ip, port));
}
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxIdle(Integer.parseInt(props.getProperty("maxIdle")));
config.setMaxTotal(Integer.parseInt(props.getProperty("maxTotal")));
pool = new JedisCluster(nodes, config);
}
說明:
Pool的建立即呼叫了JedisCluster中hget方法。
1.1.2 JedisCluster中hget方法
public String hget(String key, String field)
{
return (String)new JedisClusterCommand(this.connectionHandler, this.maxRedirections, key, field)
{
public String execute(Jedis connection) {
return connection.hget(this.val$key, this.val$field);
}
}
.run(key);
}
說明:
該hget方法先new了一個JedisClusterCommand物件,該物件中的run方法為新建jedis連接的程序,即通過key獲取slot,再獲取該slot所在的節點進行連接。Execute方法則通過該連接向目標節點發送命令,以獲取結果。通過該方法可以呼叫BinaryJedis中的hget()。
1.1.3 BinaryJedis中hget方法
public byte[] hget(byte[] key, byte[] field)
{
checkIsInMulti();
this.client.hget(key, field);
return this.client.getBinaryBulkReply();
}
說明:
BinaryJedis是jedis的二進制實作,即引數都進行二進制轉化。在這一層呼叫了client的hegt方法,即向底層發送hget命令,getBinaryBulkReply()為獲取命令執行后的結果再回傳給上層物件。
1.1.4 BinaryClient中hget方法
public void hget(byte[] key, byte[] field) {
sendCommand(Protocol.Command.HGET, new byte[][] { key, field });
}
說明:
BinaryClient為Client的二進制實作。在這一層呼叫了父類Connection的sendCommand方法,通過字面可以猜出是向底層客戶端發送命令的意思。
1.1.5 Connection中sendCommand方法
protected Connection sendCommand(ProtocolCommand cmd, byte[][] args) {
try {
connect();
Protocol.sendCommand(this.outputStream, cmd, args);
this.pipelinedCommands += 1;
return this;
}
catch (JedisConnectionException ex) {
this.broken = true;
}throw ex;
}
說明:
在Connection的sendCommand方法中先對目標節點進行了socket連接,connect()方法如下。然后呼叫Protocol的sendCommand方法,注意引數有outputStream,可以看出是要以輸出流的形式給底層發送命令。
public void connect() {
if (!isConnected())
try {
this.socket = new Socket();
this.socket.setReuseAddress(true);
this.socket.setKeepAlive(true);
this.socket.setTcpNoDelay(true);
this.socket.setSoLinger(true, 0);
this.socket.connect(new InetSocketAddress(this.host, this.port), this.connectionTimeout);
this.socket.setSoTimeout(this.soTimeout);
this.outputStream = new RedisOutputStream(this.socket.getOutputStream());
this.inputStream = new RedisInputStream(this.socket.getInputStream());
} catch (IOException ex) {
this.broken = true;
throw new JedisConnectionException(ex);
}
}
1.1.6 Protocol中sendCommand方法
private static void sendCommand(RedisOutputStream os, byte[] command, byte[][] args)
{
try {
os.write(42);
os.writeIntCrLf(args.length + 1);
os.write(36);
os.writeIntCrLf(command.length);
os.write(command);
os.writeCrLf();
for (byte[] arg : args) {
os.write(36);
os.writeIntCrLf(arg.length);
os.write(arg);
os.writeCrLf();
}
} catch (IOException e) {
throw new JedisConnectionException(e);
}
}
說明:
Protocol的sendCommand方法中可以更直觀的看出,這是利用輸出流向指定的連接執行寫操作。
以上,即為jedis實作的hget方法,其他實作方法類似。
1.2 總結
和Redis Server通信的協議規則都在redis.clients.jedis.Protocol這個類中,主要是通過對RedisInputStream和RedisOutputStream對讀寫操作來完成。命令的發送都是通過redis.clients.jedis.Protocol的sendCommand來完成的,就是對RedisOutputStream寫入位元組流,回傳的資料是通過讀取RedisInputStream 進行決議處理后得到的。
和Redis Sever的Socket通信是由 redis.clients.jedis.Connection 實作的。Connection 中維護了一個底層Socket連接和自己的I/O Stream 還有Protocol,I/O Stream是在Connection中Socket建立連接后獲取并在使用時傳給Protocol的。
第2章 Jedis定位redis集群節點程序
前面提到JedisClusterCommand類的run方法為新建jedis連接的程序,即訪問節點定位程序,下面我們就來研究一下當一個客戶端請求過來的時候,jedis是如何定位其訪問的服務器節點的。
2.1 原始碼分析
2.1.1 JedisClusterCommand類的run方法
public T run(String key) {
if (key == null) {
throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");
}
return runWithRetries(key, this.redirections, false, false);
}
說明:
Run方法很簡單就是呼叫了一下runWithRetries方法。
2.1.2 runWithRetries方法
private T runWithRetries(String key, int redirections, boolean tryRandomNode, boolean asking) {
if (redirections <= 0) {
throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
}
Jedis connection = null;
try
{
if (asking)
{
connection = (Jedis)this.askConnection.get();
connection.asking();
asking = false;
}
else if (tryRandomNode) {
connection = this.connectionHandler.getConnection();
} else {
Connection= = this.connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
}
Object localObject1 = execute(connection);
return localObject1;
}
catch (JedisConnectionException jce)
{
if (tryRandomNode)
{
throw jce;
}
releaseConnection(connection, true);
connection = null;
localObject2 = runWithRetries(key, redirections - 1, true, asking);
return localObject2;
}
catch (JedisRedirectionException jre)
{
if ((jre instanceof JedisAskDataException)) {
asking = true;
this.askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
} else if ((jre instanceof JedisMovedDataException))
{
this.connectionHandler.renewSlotCache();
} else {
throw new JedisClusterException(jre);
}
releaseConnection(connection, false);
connection = null;
Object localObject2 = runWithRetries(key, redirections - 1, false, asking);
return localObject2; } finally { releaseConnection(connection, false); } throw localObject3;
}
說明:
runWithRetries的實作程序主要分為三個部分:正常執行、拋JedisConnectionException例外后的執行和拋JedisRedirectionException例外后的執行。
1) 首先兩個引數asking和tryRandomNode在最初正常執行runWithRetries方法時都默認為false,即首次會執行:
Connection=this.connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
該方法首先通過key計算相應的slot。再通過slot定位到node。
2) 當試圖連接指定的節點發生連接例外JedisConnectionException時,會執行:
localObject2 = runWithRetries(key, redirections - 1, true, asking); 該方法中引數tryRandomNode為true,即執行
connection = this.connectionHandler.getConnection();
進行隨機訪問。
3) 當發生重定向例外JedisRedirectionException時(隨機訪問的節點,key值可能不在該節點,則需要重定向到key所在的節點,一般重定向發生例外都是指定的slot正在發生遷移所致),說明指定查詢key值所在的slot正在發生遷移,此時的slot是不接受任何命令的,但是若加上asking命令,則正常執行,因此會執行:
Object localObject2 = runWithRetries(key, redirections - 1, false, asking);
該方法中引數asking為true,即執行:
connection = (Jedis)this.askConnection.get();
connection.asking();
該方法會先發送一個asking命令給服務器端,然后再執行其他的命令。
2.1.3 根據slot獲取對應node程序
下面我們說一下獲取slot的程序,JedisClusterCRC16的getSlot方法程序:取CRC16校驗碼除以16384取模,16384為slot的個數。
獲取到slot后,要確定slot被分配到哪臺節點上了,然后讓客戶端連接指定的目標節點。該程序是通過JedisSlotBasedConnectionHandler的getConnectionFromSlot方法實作的。
2.1.3.1 getConnectionFromSlot方法
public Jedis getConnectionFromSlot(int slot)
{
JedisPool connectionPool = this.cache.getSlotPool(slot);
if (connectionPool != null)
{
return connectionPool.getResource();
}
return getConnection();
}
說明:
通過上面的方法可以看出當取出的節點為空時,要呼叫getConnection()進行隨機選取節點進行連接,否則就連接所得到的目標節點。
2.1.3.2 getConnection()方法
public Jedis getConnection()
{
List pools = getShuffledNodesPool();
for (JedisPool pool : pools) {
Jedis jedis = null;
try {
jedis = pool.getResource();
if (jedis == null)
{
continue;
}
String result = jedis.ping();
if (result.equalsIgnoreCase("pong")) return jedis;
pool.returnBrokenResource(jedis);
} catch (JedisConnectionException ex) {
if (jedis != null) {
pool.returnBrokenResource(jedis);
}
}
}
說明:
隨機選取目標節點主要實作方法為getShuffledNodesPool(),即將當前獲取到的節點進行洗牌,然后回圈洗牌后的所有節點,知道取出可以ping通的節點為止。
2.1.3.3 JedisClusterInfoCache中的getSlotPool方法
public JedisPool getSlotPool(int slot)
{
this.r.lock();
try {
JedisPool localJedisPool = (JedisPool)this.slots.get(Integer.valueOf(slot));
return localJedisPool; } finally { this.r.unlock(); } throw localObject;
}
說明:
該方法從全域變數slots中取出指定slot對應的node,slots里存盤了slot和node的對應關系,slots變數在快取初始化時對其進行賦值。
2.2 總結
2.2.1 正常情況
正常情況下,jedis會根據客戶端傳的key定位到slot,再跟據slot從slots中查到其分配的節點,然后向該節點發送請求并處理回傳結果。
2.2.2 例外情況1:連接例外
當指定的節點發生連接例外,例如突然當機,jedis會呼叫一個隨機選擇的方法getConnection(),然后向隨機選取后的節點發送請求,并處理其回傳結果。重新選擇的機會只有5次(jedis底層配置),否則會報Too many Cluster redirections錯誤。
2.2.3 例外情況2:slots為空
當slots為空,例如初始化時發生錯誤,那么根據slot是得不到指定的節點,則jedis會隨機選取一個節點,然后向其發送請求,并處理回傳結果。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/119268.html
標籤:其他
上一篇:開機老是出現這個 大神看看怎么破
下一篇:jedis原始碼分析(下)
