前言
從半年現在從0開始搭建Flink實時計算平臺,部分存盤層用到了Elasticsearch,從零開始接觸Flink,這半年來遇到了好多坑,由傳統的開發轉變成了大資料開發,Elasticsearch內含有多種熔斷器,為了防止OOM,由于目前業務查詢的方式會造成成本很高,(可以看一下allow_expensive_querys),某次查詢可能會引起服務的熔斷,這時候有可能引起實時任務 sink Elasticsearch請求也會被熔斷,
當然 Flink Connector 提供了幾種失敗處理機制
IgnoringFailureHandler: 會忽略所有 sink elasticsearch Connector的例外 ;NoOpFailureHandler: 不處理任何例外,只輸出例外堆疊資訊(默認);RetryRejectedExecutionFailureHandler: 遇到特定例外時會進行重試 包涵EsRejectedExecutionException類以及他的子類,
當我們遇到更新比較多頻繁的時候,用IgnoringFailureHandler當寫入ES失敗時不影響Flink任務,當然遇到比較敏感統計時,我們需要對失敗的結果集進行重試,
需要配合RetryRejectedExecutionFailureHandler 來進行處理,原始碼中只會處理EsRejectedExecutionException類以及他的子類,當然熔斷型別的例外歸屬于ElasticsearchStatusException 例外,兩者并沒有關系,為防止Flink因elasticsearch集群熔斷導致掛掉,我們需要做特定的處理,重寫ActionRequestFailureHandler,
重寫處理類
策略類
為了可以更好地擴展,我們首先定義一個策略類ElasticsearchExceptionHandlerStrategy代碼如下:
/**
* @author liweigao
* @date 2021/12/2 下午11:17
*/
@Getter
public enum ElasticsearchExceptionHandlerStrategy {
/**
* 默認空不處理或默認使用父類 由handler來決定實作
*/
DEFAULT(Lists.newArrayList()),
/***
* 全部例外 Throwable 級別
* 需要注意
*/
ALL_EXCEPTION(Lists.newArrayList(Throwable.class)),
/**
* @see org.elasticsearch.ElasticsearchException
* @see org.elasticsearch.ElasticsearchException.ElasticsearchExceptionHandle
* <p>
* elasticsearch 封裝的例外
*/
ELASTICSEARCH_EXCEPTION(Lists.newArrayList(ElasticsearchException.class)),
/**
* @see org.elasticsearch.ElasticsearchStatusException
* @see org.elasticsearch.rest.RestStatus
* @see EsRejectedExecutionException
* <p>
* elasticsearch 狀態例外
* todo 可根據相應的例外進行細化~
* 可進行通信的鏈接狀態錯誤(比如 es熔斷導致的429錯誤)
*/
ELASTICSEARCH_STATUS_AND_REJECTED_EXCEPTION(Lists.newArrayList(org.elasticsearch.ElasticsearchStatusException.class,
EsRejectedExecutionException .class)),;
final List<Class<? extends Throwable>> exceptionClass;
ElasticsearchExceptionHandlerStrategy(List<Class<? extends Throwable>> exceptionClass) {
this.exceptionClass = exceptionClass;
}
}
定義了四種策略
ALL_EXCEPTION全部例外ELASTICSEARCH_EXCEPTIONELASTICSEARCH_EXCEPTION elasticsearch全部例外ELASTICSEARCH_STATUS_AND__EXCEPTIONEsRejectedExecutionException和ElasticsearchStatusException例外DEFAULT默認空不處理或默認使用父類, 由handler來決定實作
可根據實際業務去擴展ElasticsearchExceptionHandlerStrategy 列舉類,
重寫例外處理類
RetryExecutionFailureHandler: 特定的例外失敗重試 如果策略為DEFAULT時 會交由父類去處理(RetryRejectedExecutionFailureHandler) 代碼如下:
/**
* 可重試例外處理,根據{@link ElasticsearchExceptionHandlerStrategy} 進行處理
*
* @author liweigao
* @date 2021/12/2 下午11:27
*/
@Slf4j
public class RetryExecutionFailureHandler extends RetryRejectedExecutionFailureHandler {
private static final long serialVersionUID = -1;
private ElasticsearchExceptionHandlerStrategy strategy;
@Nullable
public RetryExecutionFailureHandler(ElasticsearchExceptionHandlerStrategy strategy) {
this.strategy = strategy;
}
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
if (Objects.isNull(strategy) || CollectionUtils.isEmpty(strategy.getExceptionClass())) {
super.onFailure(action, failure, restStatusCode, indexer);
return;
}
log.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
for (Class<? extends Throwable> exceptionClass : strategy.getExceptionClass()) {
if (ExceptionUtils.findThrowable(failure, exceptionClass).isPresent()) {
indexer.add(action);
return;
}
}
// rethrow all other failures
throw failure;
}
}
IgnoringExceptionFailureHandler: 特定的例外忽略 如果策略為DEFAULT時 類似于IgnoringFailureHandler處理代碼如下:
/**
* 忽略特定例外,如果沒指定時默認為全部忽略
*
* @author liweigao
* @date 2021/12/2 下午11:35
*/
@Slf4j
public class IgnoringExceptionFailureHandler implements ActionRequestFailureHandler {
private static final long serialVersionUID = -1;
private ElasticsearchExceptionHandlerStrategy strategy;
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
if (Objects.isNull(strategy) || CollectionUtils.isEmpty(strategy.getExceptionClass())) {
return;
}
log.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
for (Class<? extends Throwable> exceptionClass : strategy.getExceptionClass()) {
if (ExceptionUtils.findThrowable(failure, exceptionClass).isPresent()) {
return;
}
}
// rethrow all other failures
throw failure;
}
}
Sink ES 代碼詳細配置
偽代碼如下:
ElasticsearchSink.Builder<Object> builder = new ElasticsearchSink.Builder<Object>(httpHosts,
new ElasticsearchSinkFunction(){...});
//配置批量提交
builder.setBulkFlushBackoff(true);
//設定重試次數
builder.setBulkFlushBackoffRetries(2);
//設定重試間隔
builder.setBulkFlushBackoffDelay(2000L);
//設定重試策略CONSTANT: 常數 eg: 重試間隔為2s 重試3次 會在2s->4s->6s進行; EXPONENTIAL:指數 eg: 重試間隔為2s 重試3次 會在2s->4s->8s進行
builder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.CONSTANT);
//設定批量提交最大資料量
builder.setBulkFlushMaxSizeMb(10);
//設定批量提交間隔
builder.setBulkFlushInterval(2000L);
//設定批量提交的最大條數
builder.setBulkFlushMaxActions(1000);
//設定重試機制
builder.Builder<Object>.setFailureHandler(new RetryExecutionFailureHandler(ElasticsearchExceptionHandlerStrategy.DEFAULT));
Elasticsearch失敗重試機制依賴于checkpoint 可參看原始碼:
ElasticsearchSinkBase類
總結
以上拙見,畢竟才入坑,歡迎交流~ 推薦一波Flink 的發布平臺,切記:沒有最優的公共配置,需要根據特定場景才能達到相應的效果,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/375791.html
標籤:其他
