業務背景
原大資料團隊不再維護DB資料同步至es的服務,由我們業務團隊自己維護,實作方案:使用canal監聽DB binlog,將資料寫入es
問題描述
為啥要看es執行緒池呢?因為線上突然瘋狂報錯es執行緒池被打滿,但竟然看不懂該如何修改es執行緒池配置,線上例外堆疊如下:
EsRejectedExecutionException[rejected execution of org.elasticsearch.transport.TransportService$7@4d334adf on EsThreadPoolExecutor[bulk, queue capacity = 2000, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@3c0a3704[Running, pool size = 16, active threads = 16, queued tasks = 2009, completed tasks = 284167676]]]
at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:50)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.doExecute(EsThreadPoolExecutor.java:94)
at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:89)
at org.elasticsearch.transport.TransportService.sendLocalRequest(TransportService.java:614)
at org.elasticsearch.transport.TransportService.access$000(TransportService.java:73)
at org.elasticsearch.transport.TransportService$3.sendRequest(TransportService.java:133)
at org.elasticsearch.transport.TransportService.sendRequestInternal(TransportService.java:562)
at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:495)
at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:483)
at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase.performAction(TransportReplicationAction.java:751)
at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase.performLocalAction(TransportReplicationAction.java:670)
at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase.doRun(TransportReplicationAction.java:658)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at org.elasticsearch.action.support.replication.TransportReplicationAction.doExecute(TransportReplicationAction.java:147)
at org.elasticsearch.action.support.replication.TransportReplicationAction.doExecute(TransportReplicationAction.java:93)
at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:170)
at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:142)
at org.elasticsearch.action.support.replication.TransportReplicationAction$OperationTransportHandler.messageReceived(TransportReplicationAction.java:222)
at org.elasticsearch.action.support.replication.TransportReplicationAction$OperationTransportHandler.messageReceived(TransportReplicationAction.java:219)
at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:69)
at org.elasticsearch.transport.TcpTransport$RequestHandler.doRun(TcpTransport.java:1488)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at org.elasticsearch.common.util.concurrent.EsExecutors$1.execute(EsExecutors.java:109)
at org.elasticsearch.transport.TcpTransport.handleRequest(TcpTransport.java:1445)
at org.elasticsearch.transport.TcpTransport.messageReceived(TcpTransport.java:1329)
at org.elasticsearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:74)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:642)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:527)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:481)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:441)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at java.lang.Thread.run(Thread.java:745)
問題分析
分析個錘子哦,很明顯es執行緒池被打滿了,且執行緒池配置相當過分,只有16的size,es的執行緒池默認值也是很過分啦
服務中es客戶端初始化代碼
InetSocketTransportAddress[] addresses = new InetSocketTransportAddress[hostNames.length];
for (int i = 0; i < hostNames.length; i++) {
addresses[i] = new InetSocketTransportAddress(
new InetSocketAddress(hostNames[i], esOutputConfig.getPort()));
}
Settings.Builder builder = Settings.builder()
.put("cluster.name", esOutputConfig.getClusterName());
Settings settings = builder.build();
transportClient = new PreBuiltTransportClient(settings);
transportClient.addTransportAddresses(addresses);
transportClients.add(transportClient);
問題解決
調大執行緒池配置,
起初由于不知道怎樣配置es執行緒池數量,且查資料也沒查到,臨時方案:增加es客戶端數量(變相調增執行緒池大小),
查看es代碼,增加配置如下
Settings.Builder builder = Settings.builder()
.put("cluster.name", esOutputConfig.getClusterName());
if (taskConfig.getName().contains("order_detail_shard")
|| taskConfig.getName().contains("order_shard")) {
builder.put("thread_pool.bulk.size", 100);
看到V1版本就應該知道結論了吧,沒錯,沒生效,線上依然瘋狂報錯執行緒被打滿,size依然是16
查看es客戶端執行緒池原始碼
執行緒池代碼,N多個執行緒池,重點在于要搞懂es配置Settings的使用方法
public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
super(settings);
assert Node.NODE_NAME_SETTING.exists(settings);
final Map<String, ExecutorBuilder> builders = new HashMap<>();
final int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);
final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200));
builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse
...
}
我們業務場景主要使用的寫入執行緒池,BULK執行緒池:FixedExecutorBuilder
FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize) {
this(settings, name, size, queueSize, "thread_pool." + name);
}
size為availableProcessors引數
availableProcessors引數取值邏輯
public static final Setting<Integer> PROCESSORS_SETTING =
Setting.intSetting("processors", Math.min(32, Runtime.getRuntime().availableProcessors()), 1, Property.NodeScope);
public static int boundedNumberOfProcessors(Settings settings) {
/* This relates to issues where machines with large number of cores
* ie. >= 48 create too many threads and run into OOM see #3478
* We just use an 32 core upper-bound here to not stress the system
* too much with too many created threads */
return PROCESSORS_SETTING.get(settings);
}
初始化Settings,該Settings構造器引數含義:
- 配置key:processors
- 默認值提供者函式:與入參無關,直接回傳默認值
- 最小值提供者函式:將入參轉為Integer,并且入參在最小值-最大值之間,否則拋出引數例外
- 屬性
public static Setting<Integer> intSetting(String key, int defaultValue, int minValue, Property... properties) {
return new Setting<>(key, (s) -> Integer.toString(defaultValue), (s) -> parseInt(s, minValue, key), properties);
}
即availableProcessors的取值邏輯為:min(32與cpu核心數)取最小值作為默認值,最小值為硬編碼1
FixedExecutorBuilder構造器
- settings:構造客戶端時由開發者自定義的配置資訊
- name:配置名稱,例如:Names.BULK
- size:大小
- prefix:配置前綴
public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) {
super(name);
final String sizeKey = settingsKey(prefix, "size");
this.sizeSetting =
new Setting<>(
sizeKey,
s -> Integer.toString(size),
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
Setting.Property.NodeScope);
final String queueSizeKey = settingsKey(prefix, "queue_size");
this.queueSizeSetting =
Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope);
}
暫不關心佇列配置,sizeSetting配置構造器引數含義:
- sizeKey:配置key
- 默認值提供者函式:與入參無關,直接回傳默認值,即:availableProcessors
- parser:配置決議函式,根據配置的字串決議出結果型別資料,
- 屬性
配置決議函式parser
- 最小值:硬編碼為1
- 最大值:applyHardSizeLimit,如果配置名稱為:BULK、INDEX,則使用availableProcessors+1(即最大值實際為availableProcessors),否則為Integer最大值
- sizeKey:前綴+size拼接,實際規則:“thread_pool.”+name+“size”,例如:“thread_pool.bulk.size”
構建執行緒池:FixedExecutorBuilder
根據開發者提供的settings配置構建執行緒池
final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);
final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);
獲取配置:org.elasticsearch.threadpool.ExecutorBuilder#getSettings,執行緒池構建者實作類:org.elasticsearch.threadpool.FixedExecutorBuilder#getSettings
@Override
FixedExecutorSettings getSettings(Settings settings) {
final String nodeName = Node.NODE_NAME_SETTING.get(settings);
final int size = sizeSetting.get(settings);
final int queueSize = queueSizeSetting.get(settings);
return new FixedExecutorSettings(nodeName, size, queueSize);
}
Setting獲取配置邏輯
public T get(Settings settings) {
String value = getRaw(settings);
try {
return parser.apply(value);
} ...
}
獲取配置值
- 配置key為:sizeSetting.getKey(),即:thread_pool.bulk.size
- 默認值提供者函式:與入參無關,直接回傳默認值,即:availableProcessors,有效cpu核心數
// 1. settings:開發者自定義引數配置
// 2. getKey():當前setting實體的key,例如:sizeSetting
// 3. defaultValue:當前setting實體的默認值提供者函式,例如:sizeSetting
public String getRaw(Settings settings) {
checkDeprecation(settings);
return settings.get(getKey(), defaultValue.apply(settings));
}
此時不會回傳默認值,因為我們配置了thread_pool.bulk.size=100
配置決議函式決議配置值,配置最小值為1,最大值為有效cpu核心數量,即:availableProcessors+1,線上機器為16核,最大值為:16
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey);
// applyHardSizeLimit
private int applyHardSizeLimit(final Settings settings, final String name) {
if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) {
return 1 + EsExecutors.boundedNumberOfProcessors(settings);
} else {
return Integer.MAX_VALUE;
}
}
因此修改的配置沒有生效,因為100(thread_pool.bulk.size)>16(有效cpu核心數)直接拋出引數例外
總結
es客戶端配置,對于bulk、index兩個執行緒池的大小如果想要修改不能單單修改執行緒池配置,還需要一并修改jvm所在服務器的cpu有效核心數配置,修改后的代碼如下,問題解決
Settings.Builder builder = Settings.builder()
.put("cluster.name", esOutputConfig.getClusterName());
if (taskConfig.getName().contains("order_detail_shard")
|| taskConfig.getName().contains("order_shard")) {
builder.put("thread_pool.bulk.size", 100)
.put("thread_pool.bulk.queue_size", 100)
.put("processors", 200);
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/200418.html
標籤:其他
上一篇:哈西路由跳轉實作
下一篇:Kudu
