目錄
- 前言
- 相關引數
- hive.metastore.connect.retries 和 hive.metastore.failure.retries的區別
- 1. Hive與Metastore互動
- 2. 創建一個IMetaStoreClient物件
- 3. 每次呼叫IMetaStoreClient物件訪問Metastore時的底層實作邏輯
- 4. Metastore重連
- 總結
- 結論
前言
? 本文基于Hive2.1.0的Apache社區版,目的是為了探究Metastore和底層RDBMS和底層服務變更(例如版本升級、服務遷移等運維操作)對客戶端和用戶的影響,Hive提供了在客戶端對Metastore連接超時自動重連的容錯機制,允許我們通過調整引數配置調整停服時間限制,在規定時間內重啟服務對用戶無顯著影響,由于Metastore底層RDBMS我們采用的是業內通用的Mysql,因此后面以Mysql來替代RDBMS進行描述和驗證
相關引數
| 引數 | 默認值 | 說明 | 配置范圍 |
|---|---|---|---|
| hive.metastore.connect.retries | 3 | 客戶端建立與metastore連接時的重試次數 | Metastore客戶端,如CLI、Hiveserver2等 |
| hive.metastore.failure.retries | 1 | 客戶端訪問metastore的失敗重試次數 | Metastore客戶端,如CLI、Hiveserver2等 |
| hive.metastore.client.connect.retry.delay | 1s | Metastore客戶端重連/重試等待的時間 | Metastore客戶端,如CLI、Hiveserver2等 |
| hive.metastore.client.socket.timeout | 600s | Metastore客戶端socket超時時間,傳遞給底層Socket,超時之后底層Socket會自動斷開 | Metastore客戶端,如CLI、Hiveserver2等 |
| hive.metastore.client.socket.lifetime | 0 | socket存活時間,超時之后客戶端在下一次訪問Metastore時會主動斷開現有連接并重新建立連接,0表示不主動斷開 | Metastore客戶端,如CLI、Hiveserver2等 |
| hive.hmshandler.retry.attempts | 10 | 在JDO資料存盤出現錯誤后嘗試連接的次數 | Metastore |
| hive.hmshandler.retry.interval | 2000ms | JDO連接嘗試間隔,單位:ms | Metastore |
| hive.server2.thrift.client.connect.retry.limit | 1 | 客戶端建立與Hiveserver2連接的重試次數 | Hiveserver2的客戶端,如Beeline等 |
| hive.server2.thrift.client.retry.limit | 1 | 客戶端訪問Hiveserver2的失敗重試次數 | Hiveserver2的客戶端,如Beeline等 |
| hive.server2.thrift.client.retry.delay.seconds | 1s | Hiveserver2客戶端重連/重試等待的時間 | Hiveserver2的客戶端,如Beeline等 |
hive.metastore.connect.retries 和 hive.metastore.failure.retries的區別
? 為了弄清這兩個引數的區別,讓我們通過原始碼來確認一下,ps:為了方便閱讀后面會用......省略掉無關的代碼邏輯
1. Hive與Metastore互動
? CLI和Hiveserver2都是通過org.apache.hadoop.hive.ql.metadata.Hive類與Metastore的互動的,首先讓我們以createDatabase(Database, boolean)方法為例來看看具體的互動程序
/**
* Create a database
* @param db
* @param ifNotExist if true, will ignore AlreadyExistsException exception
* @throws AlreadyExistsException
* @throws HiveException
*/
public void createDatabase(Database db, boolean ifNotExist)
throws AlreadyExistsException, HiveException {
try {
getMSC().createDatabase(db);
} catch (AlreadyExistsException e) {
if (!ifNotExist) {
throw e;
}
} catch (Exception e) {
throw new HiveException(e);
}
}
/**
* @return the metastore client for the current thread
* @throws MetaException
*/
@LimitedPrivate(value = https://www.cnblogs.com/nortingHome/archive/2020/10/29/{"Hive"})
@Unstable
public synchronized IMetaStoreClient getMSC(
boolean allowEmbedded, boolean forceCreate) throws MetaException {
if (metaStoreClient == null || forceCreate) {
......
try {
metaStoreClient = createMetaStoreClient(allowEmbedded);
} catch (RuntimeException ex) {
......
}
......
}
return metaStoreClient;
}
? Hive類維護了一個IMetaStoreClient物件,通過getMSC()方法獲取,getMSC()方法在這里采用了懶漢模式去創建,接下來看下Hive是如何創建一個IMetaStoreClient物件的
2. 創建一個IMetaStoreClient物件
// org.apache.hadoop.hive.ql.metadata.Hive.java
private IMetaStoreClient createMetaStoreClient(boolean allowEmbedded) throws MetaException {
......
if (conf.getBoolVar(ConfVars.METASTORE_FASTPATH)) {
return new SessionHiveMetaStoreClient(conf, hookLoader, allowEmbedded);
} else {
return RetryingMetaStoreClient.getProxy(conf, hookLoader, metaCallTimeMap,
SessionHiveMetaStoreClient.class.getName(), allowEmbedded);
}
}
? if后面的分支用于創建客戶端內置的本地Metastore,這主要用于開發除錯階段,因此我們只關注else后面的邏輯,即通過RetryingMetaStoreClient.getProxy方法創建一個IMetaStoreClient物件,RetryingMetaStoreClient.getProxy方法通過幾次簡單地呼叫多載函式,最終來到下面的方法
// org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.java
public static IMetaStoreClient getProxy(HiveConf hiveConf, Class<?>[] constructorArgTypes,
Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap,
String mscClassName) throws MetaException {
@SuppressWarnings("unchecked")
Class<? extends IMetaStoreClient> baseClass =
(Class<? extends IMetaStoreClient>)MetaStoreUtils.getClass(mscClassName);
RetryingMetaStoreClient handler =
new RetryingMetaStoreClient(hiveConf, constructorArgTypes, constructorArgs,
metaCallTimeMap, baseClass);
return (IMetaStoreClient) Proxy.newProxyInstance(
RetryingMetaStoreClient.class.getClassLoader(), baseClass.getInterfaces(), handler);
}
? 可以看到,這里利用Java代理機制創建并回傳了一個IMetaStoreClient的代理——RetryingMetaStoreClient,此后對IMetaStoreClient物件的呼叫都委托給RetryingMetaStoreClient.invoke 處理,接下來讓我們看下RetryingMetaStoreClient.invoke方法是如何處理用戶對IMetastoreClient物件的操作的
3. 每次呼叫IMetaStoreClient物件訪問Metastore時的底層實作邏輯
// org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.java
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object ret = null;
int retriesMade = 0;
TException caughtException = null;
while (true) {
try {
reloginExpiringKeytabUser();
// 1. 檢查是否重連,重連的場景包括:
// a) 上一次回圈訪問Metastore例外,且例外型別支持自動重試訪問
// b) 底層socket超時,超時引數:hive.metastore.client.socket.lifetime
if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) {
base.reconnect();
lastConnectionTime = System.currentTimeMillis();
}
if (metaCallTimeMap == null) {
ret = method.invoke(base, args);
} else {
// need to capture the timing
long startTime = System.currentTimeMillis();
ret = method.invoke(base, args);
long timeTaken = System.currentTimeMillis() - startTime;
addMethodTime(method, timeTaken);
}
// 2. 訪問Metastore正常,回傳結果給上層呼叫并結束回圈,用戶不主動結束的情況下底層與Metastore的連接持續保持著
break;
// 3. 處理訪問Metastore程序中出現的例外,例外主要分三類:
// a) 用戶操作例外或元資料例外,將例外拋給用戶處理并結束回圈
// b) 底層連接例外,例如網路問題、Metastore服務例外(停服、連接超限等)等支持自動重連,進入步驟4
// c) 其他未知例外,拋給用戶處理并結束回圈
} catch (UndeclaredThrowableException e) {
throw e.getCause();
} catch (InvocationTargetException e) {
Throwable t = e.getCause();
if (t instanceof TApplicationException) {
TApplicationException tae = (TApplicationException)t;
switch (tae.getType()) {
case TApplicationException.UNSUPPORTED_CLIENT_TYPE:
case TApplicationException.UNKNOWN_METHOD:
case TApplicationException.WRONG_METHOD_NAME:
case TApplicationException.INVALID_PROTOCOL:
throw t;
default:
caughtException = tae;
}
} else if ((t instanceof TProtocolException) || (t instanceof TTransportException)) {
caughtException = (TException)t;
} else if ((t instanceof MetaException) && t.getMessage().matches(
"(?s).*(JDO[a-zA-Z]*|TProtocol|TTransport)Exception.*") &&
!t.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) {
caughtException = (MetaException)t;
} else {
throw t;
}
} catch (MetaException e) {
if (e.getMessage().matches("(?s).*(IO|TTransport)Exception.*") &&
!e.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) {
caughtException = e;
} else {
throw e;
}
}
// 4. 對于支持自動重試的例外,會記錄重試次數并驗證次數是否超限,是則回傳例外并結束回圈,否則將以warn形式輸出例外日志提醒并等等一段時間后開始下一次回圈自動重試訪問Metastore,這里用到的重試次數引數和等待時間引數分別是 hive.metastore.failure.retries,hive.metastore.client.connect.retry.delay
if (retriesMade >= retryLimit) {
throw caughtException;
}
retriesMade++;
Thread.sleep(retryDelaySeconds * 1000);
}
return ret;
}
protected RetryingMetaStoreClient(HiveConf hiveConf, Class<?>[] constructorArgTypes,
Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap,
Class<? extends IMetaStoreClient> msClientClass) throws MetaException {
this.retryLimit = hiveConf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES);
this.retryDelaySeconds = hiveConf.getTimeVar(
HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS);
this.metaCallTimeMap = metaCallTimeMap;
this.connectionLifeTimeInMillis = hiveConf.getTimeVar(
HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME, TimeUnit.MILLISECONDS);
......
this.base = (IMetaStoreClient) MetaStoreUtils.newInstance(
msClientClass, constructorArgTypes, constructorArgs);
}
? 從 RetryingMetaStoreClient 的建構式中可以發現,RetryingMetaStoreClient 維護了一個 HiveMetaStoreClient 物件,用戶在上層呼叫一次 RetryingMetaStoreClient 物件操作,例如第一步的 createDatabase 方法,會經過 RetryingMetaStoreClient.invoke 的封裝最終呼叫HiveMetaStoreClient類中的同名方法進行操作,在 RetryingMetaStoreClient.invoke 中封裝了自動重試的邏輯,在底層與Metastore的連接程序中出現例外的情況下會自動重試而不影響上層用戶的操作,
? 這里我們在注釋中標注了 invoke 方法中主要的操作步驟,可以看到,重試次數由引數hive.metastore.failure.retries控制,兩次重試之間的等待時間由hive.metastore.client.connect.retry.delay控制,
? 注意,這里我們說的是“重試”,而不是“重連”,一次重試中與Metastore的互動有兩步:1. 建立與Metastore的會話 2. 執行用戶請求,我們繼續看下客戶端是怎么建立與Metastore的會話的
4. Metastore重連
// org.apache.hadoop.hive.metastore.HiveMetaStoreClient.java
@Override
public void reconnect() throws MetaException {
......
close();
// 當配置了多個Metastore時,會隨機調整Metastore順序
promoteRandomMetaStoreURI();
open();
}
private void open() throws MetaException {
isConnected = false;
......
// hive.metastore.client.socket.timeout
int clientSocketTimeout = (int) conf.getTimeVar(
ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
for (URI store : metastoreUris) {
try {
transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout);
......
try {
transport.open();
isConnected = true;
} catch (TTransportException e) {
......
}
......
} catch (MetaException e) {
......
}
if (isConnected) {
break;
}
}
// Wait before launching the next round of connection retries.
if (!isConnected && retryDelaySeconds > 0) {
try {
Thread.sleep(retryDelaySeconds * 1000);
} catch (InterruptedException ignore) {}
}
}
if (!isConnected) {
throw new MetaException("Could not connect to meta store using any of the URIs provided." +
" Most recent failure: " + StringUtils.stringifyException(tte));
}
......
}
public HiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader hookLoader, Boolean allowEmbedded)
throws MetaException {
......
// hive.metastore.connect.retries
retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES);
// hive.metastore.client.connect.retry.delay
retryDelaySeconds = conf.getTimeVar(
ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS);
......
// 初始化一個HiveMetaStoreClient物件時會嘗試建立與Metastore的長會話
open();
}
? 同上一步的重試邏輯類似,與Metastore的連接支持自動重連,由 hive.metastore.connect.retries 控制重連次數,hive.metastore.client.connect.retry.delay 控制重連等待時間,底層利用Thrift提供的RPC通信服務,
? 如果配置了多個Metastore地址,每一次重連的時候會按順序遍歷所有的Metastore并嘗試與之建立會話,直到有一個會話建立成功為止,
? 此外,初始化一個HiveMetaStoreClient物件時會呼叫open()方法嘗試建立一個與Metastore的長會話,供后面的用戶請求使用
總結
- HiveMetaStoreClient.open() 方法建立一個與Metastore的會話,該方法中會在連接失敗的情況下自動重連,重連次數、重連等待時間分別由引數 hive.metastore.connect.retries 、 hive.metastore.client.connect.retry.delay 控制,且每次重連時會遍歷用戶配置的所有的Metastore直到成功建立一個會話
- 用戶新建一個Metastore客戶端(例如啟動一個CLI、Hiveserver2行程)時,會初始化并維護一個IMetaStoreClient物件,在初始化時呼叫 HiveMetaStoreClient.open()方法建立一個與Metastore的長會話
- 用戶每次呼叫IMetaStoreClient中的方法進行業務操作,實際上委托給 RetryingMetaStoreClient.invoke 方法操作,在遇到與Metastore連接等例外時會進行自動重試,重試次數、重試等待時間分別由引數 hive.metastore.failure.retries 、 hive.metastore.client.connect.retry.delay 控制
- RetryingMetaStoreClient.invoke 中每次重試會嘗試呼叫 HiveMetaStoreClient.reconnect() 方法重連Metastore,HiveMetaStoreClient.reconnect() 方法內會呼叫 HiveMetaStoreClient.open() 去連接Metastore,因此,invoke方法實際上在重試回圈中嵌套了回圈重連Metastore的操作
- 所以 hive.metastore.failure.retries 引數實際上僅用于在已經建立了Metastore的會話的基礎上進行正常的業務訪問程序中遇到連接例外等問題時的重試次數限制,而 hive.metastore.connect.retries 則是更底層自動重連Metastore的次數限制
- 此外,hive.server2.thrift.client.connect.retry.limit 同 hive.server2.thrift.client.retry.limit 的區別也與hive.metastore.connect.retries 和 hive.metastore.failure.retries的區別類似,這里就不再贅述,有興趣的同學可以參照本篇檔案去研究下原始碼
結論
- 僅停止Mysql服務
- Metastore重試總時間 = hive.hmshandler.retry.attempts * hive.hmshandler.retry.interval
- CLI、Hiveserver2會報JDO相關的例外,并斷開與Metastore的連接
- 當CLI、Hiveserver2與Metastore的連接無回應時間超過hive.metastore.client.socket.timeout值會自動斷開連接
- 僅停止metastore服務,CLI、Hiveserver2會列印“Failed to connect to the MetaStore Server”及重連失敗的例外,但會在多次重連仍然失敗后才退出
- 已經與Metastore建立會話的情況下,客戶端的每一次業務請求的重試總時間 = hive.metastore.connect.retries * (hive.metastore.failure.retries + 1) hive.metastore.client.connect.retry.delay*
- 停服期間客戶端新建一個Metastore連接程序中重試總時間間隔 = hive.metastore.connect.retries * hive.metastore.client.connect.retry.delay
- 關聯Beeline會卡住,直到Hiveserver2走完一個完整的重連Metastore周期后放棄連接Metastore為止,此時Hiveserver2會回傳例外
- 僅關閉Hiveserver2服務,Beeline直接報錯,不會重試,需要手動重連
- 以上各組件除Beeline外均可在上游服務恢復后自動恢復,通過自動重連機制實作
- 綜上,建議進行底層服務變更(Metastore或MySQL)時停止Metastore服務,在停服之前需提前配置好Metastore客戶端超時重連的相關引數,預留適當的變更時間
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/196095.html
標籤:其他
