主頁 > 資料庫 > 一文讀懂,硬核 Apache DolphinScheduler3.0 原始碼決議

一文讀懂,硬核 Apache DolphinScheduler3.0 原始碼決議

2022-09-16 09:12:50 資料庫

?

點亮 ?? Star · 照亮開源之路

https://github.com/apache/dolphinscheduler

本文目錄

  • 1 DolphinScheduler的設計與策略

  • 1.1 分布式設計

  • 1.1.1 中心化

  • 1.1.2 去中心化

  • 1.2 DophinScheduler架構設計

  • 1.3 容錯問題

  • 1.3.1 宕機容錯

  • 1.3.2 失敗重試

  • 1.4 遠程日志訪問

  • 2 DolphinScheduler原始碼分析

  • 2.1 工程模塊介紹與組態檔

  • 2.1.1 工程模塊介紹

  • 2.1.2 組態檔

  • 2.2 Api主要任務操作介面

  • 2.3 Quaterz架構與運行流程

  • 2.3.1 概念與架構

  • 2.3.2 初始化與執行流程

  • 2.3.3 集群運轉

  • 2.4 Master啟動與執行流程

  • 2.4.1 概念與執行邏輯

  • 2.4.2 集群與槽(slot)

  • 2.4.3 代碼執行流程

  • 2.5 Work啟動與執行流程

  • 2.5.1 概念與執行邏輯

  • 2.5.2 代碼執行流程

  • 2.6 rpc互動

  • 2.6.1 Master與Worker互動

  • 2.6.2 其他服務與Master互動

  • 2.7 負載均衡演算法

  • 2.7.1 加權隨機

  • 2.7.2 線性負載

  • 2.7.3 平滑輪詢

  • 2.8 日志服務

  • 2.9 報警

  • 3 后記

  • 3.1 Make friends

  • 3.2 參考文獻

前言

研究Apache Dolphinscheduler也是機緣巧合,平時負責基于xxl-job二次開發出來的調度平臺,因為遇到了并發性能瓶頸,到了不得不優化重構的地步,所以搜索市面上應用較廣的調度平臺以借鑒優化思路,

在閱讀完DolphinScheduler代碼之后,便生出了將其設計與思考記錄下來的念頭,這便是此篇文章的來源,因為沒有正式生產使用,業務理解不一定透徹,理解可能有偏差,歡迎大家交流討論,

1 DolphinScheduler的設計與策略

大家能關注DolphinScheduler那么一定對調度系統有了一定的了解,對于調度所涉及的到一些專有名詞在這里就不做過多的介紹,重點介紹一下流程定義,流程實體,任務定義,任務實體,(沒有作業這個概念確實也很新奇,可能是不想和Quartz的JobDetail重疊),

  • 任務定義:各種型別的任務,是流程定義的關鍵組成,如sql,shell,spark,mr,python等;

  • 任務實體:任務的實體化,標識著具體的任務執行狀態;

  • 流程定義:一組任務節點通過依賴關系建立的起來的有向無環圖(DAG);

  • 流程實體:通過手動或者定時調度生成的流程實體;

  • 定時調度:系統采用Quartz 分布式調度器,并同時支持cron運算式可視化的生成;

1.1 分布式設計

分布式系統的架構設計基本分為中心化和去中心化兩種,各有優劣,憑借各自的業務選擇,

1.1.1 中心化

中心化設計比較簡單,集群中的節點安裝角色可以分為Master和Slave兩種,如下圖:

?

Master: Master的角色主要負責任務分發并監督Slave的健康狀態,可以動態的將任務均衡到Slave上,以致Slave節點不至于“忙死”或”閑死”的狀態,

中心化設計存在一些問題,

第一點,一旦Master出現了問題,則群龍無首,整個集群就會崩潰,

為了解決這個問題,大多數Master/Slave架構模式都采用了主備Master的設計方案,可以是熱備或者冷備,也可以是自動切換或手動切換,而且越來越多的新系統都開始具備自動選舉切換Master的能力,以提升系統的可用性,

第二點,如果Scheduler在Master上,雖然可以支持一個DAG中不同的任務運行在不同的機器上,但是會產生Master的過負載,如果Scheduler在Slave上,一個DAG中所有的任務都只能在某一臺機器上進行作業提交,在并行任務比較多的時候,Slave的壓力可能會比較大,

xxl-job就是采用這種設計方式,但是存在相應的問題,管理器(admin)宕機集群會崩潰,Scheduler在管理器上,管理器負責所有任務的校驗和分發,管理器存在過載的風險,需要開發者想方案解決,

1.1.2 去中心化

?

在去中心化設計里,通常沒有Master/Slave的概念,所有的角色都是一樣的,地位是平等的,去中心化設計的核心設計在于整個分布式系統中不存在一個區別于其他節點的“管理者”,因此不存在單點故障問題,

但由于不存在“管理者”節點所以每個節點都需要跟其他節點通信才得到必須要的機器資訊,而分布式系統通信的不可靠性,則大大增加了上述功能的實作難度,實際上,真正去中心化的分布式系統并不多見,

反而動態中心化分布式系統正在不斷涌出,在這種架構下,集群中的管理者是被動態選擇出來的,而不是預置的,并且集群在發生故障的時候,集群的節點會自發的舉行會議來選舉新的管理者去主持作業,

一般都是基于Raft演算法實作的選舉策略,Raft演算法,目前社區也有相應的PR,還沒合并,

  • PR鏈接:https://github.com/apache/dolphinscheduler/issues/10874

  • 動態展示見鏈接:http://thesecretlivesofdata.com/

DolphinScheduler的去中心化是Master/Worker注冊到注冊中心,實作Master集群和Worker集群無中心,

1.2 DophinScheduler架構設計

隨手盜用一張官網的系統架構圖,可以看到調度系統采用去中心化設計,由UI,API,MasterServer,Zookeeper,WorkServer,Alert等幾部分組成,

?

API: API介面層,主要負責處理前端UI層的請求,該服務統一提供RESTful api向外部提供請求服務,介面包括作業流的創建、定義、查詢、修改、發布、下線、手工啟動、停止、暫停、恢復、從該節點開始執行等等,

MasterServer: MasterServer采用分布式無中心設計理念,MasterServer集成了Quartz,主要負責 DAG 任務切分、任務提交監控,并同時監聽其它MasterServer和WorkerServer的健康狀態,MasterServer服務啟動時向Zookeeper注冊臨時節點,通過監聽Zookeeper臨時節點變化來進行容錯處理,WorkServer:WorkerServer也采用分布式無中心設計理念,WorkerServer主要負責任務的執行和提供日志服務,WorkerServer服務啟動時向Zookeeper注冊臨時節點,并維持心跳,

ZooKeeper: ZooKeeper服務,系統中的MasterServer和WorkerServer節點都通過ZooKeeper來進行集群管理和容錯,另外系統還基于ZooKeeper進行事件監聽和分布式鎖,

**Alert:**提供告警相關介面,介面主要包括兩種型別的告警資料的存盤、查詢和通知功能,支持豐富的告警插件自由拓展配置,

1.3 容錯問題

容錯分為服務宕機容錯和任務重試,服務宕機容錯又分為Master容錯和Worker容錯兩種情況;

1.3.1 宕機容錯

服務容錯設計依賴于ZooKeeper的Watcher機制,實作原理如圖:

?

其中Master監控其他Master和Worker的目錄,如果監聽到remove事件,則會根據具體的業務邏輯進行流程實體容錯或者任務實體容錯,容錯流程圖相對官方檔案里面的流程圖,人性化了些,大家可以參考一下,具體如下所示,

ZooKeeper Master容錯完成之后則重新由DolphinScheduler中Scheduler執行緒調度,遍歷 DAG 找到“正在運行”和“提交成功”的任務,對“正在運行”的任務監控其任務實體的狀態,對“提交成功”的任務需要判斷Task Queue中是否已經存在,如果存在則同樣監控任務實體的狀態,如果不存在則重新提交任務實體,

Master Scheduler執行緒一旦發現任務實體為” 需要容錯”狀態,則接管任務并進行重新提交,注意由于” 網路抖動”可能會使得節點短時間內失去和ZooKeeper的心跳,從而發生節點的remove事件,

對于這種情況,我們使用最簡單的方式,那就是節點一旦和ZooKeeper發生超時連接,則直接將Master或Worker服務停掉,

1.3.2 失敗重試

這里首先要區分任務失敗重試、流程失敗恢復、流程失敗重跑的概念:

  1. 任務失敗重試是任務級別的,是調度系統自動進行的,比如一個Shell任務設定重試次數為3次,那么在Shell任務運行失敗后會自己再最多嘗試運行3次,

  2. 流程失敗恢復是流程級別的,是手動進行的,恢復是從只能從失敗的節點開始執行或從當前節點開始執行,流程失敗重跑也是流程級別的,是手動進行的,重跑是從開始節點進行,

接下來說正題,我們將作業流中的任務節點分了兩種型別,

  1. 一種是業務節點,這種節點都對應一個實際的腳本或者處理陳述句,比如Shell節點、MR節點、Spark節點、依賴節點等,

  2. 還有一種是邏輯節點,這種節點不做實際的腳本或陳述句處理,只是整個流程流轉的邏輯處理,比如子流程節等,

每一個業務節點都可以配置失敗重試的次數,當該任務節點失敗,會自動重試,直到成功或者超過配置的重試次數,邏輯節點不支持失敗重試,但是邏輯節點里的任務支持重試,

如果作業流中有任務失敗達到最大重試次數,作業流就會失敗停止,失敗的作業流可以手動進行重跑操作或者流程恢復操作,

1.4 遠程日志訪問

由于Web(UI)和Worker不一定在同一臺機器上,所以查看日志不能像查詢本地檔案那樣,

有兩種方案:

  1. 將日志放到ES搜索引擎上;

  2. 通過netty通信獲取遠程日志資訊;

介于考慮到盡可能的DolphinScheduler的輕量級性,所以選擇了RPC實作遠程訪問日志資訊,具體代碼的實踐見2.8章節,

2 DolphinScheduler原始碼分析

上一章的講解可能初步看起來還不是很清晰,本章的主要目的是從代碼層面一一介紹第一張講解的功能,關于系統的安裝在這里并不會涉及,安裝運行請大家自行探索,

2.1 工程模塊介紹與組態檔

2.1.1 工程模塊介紹

  • dolphinscheduler-alert 告警模塊,提供告警服務;

  • dolphinscheduler-api web應用模塊,提供 Rest Api 服務,供 UI 進行呼叫;

  • dolphinscheduler-common 通用的常量列舉、工具類、資料結構或者基類 dolphinscheduler-dao 提供資料庫訪問等操作;

  • dolphinscheduler-remote 基于netty的客戶端、服務端 ;

  • dolphinscheduler-server 日志與心跳服務 ;

  • dolphinscheduler-log-server LoggerServer 用于Rest Api通過RPC查看日志;

  • dolphinscheduler-master MasterServer服務,主要負責 DAG 的切分和任務狀態的監控 ;

  • dolphinscheduler-worker WorkerServer服務,主要負責任務的提交、執行和任務狀態的更新;

  • dolphinscheduler-service service模塊,包含Quartz、Zookeeper、日志客戶端訪問服務,便于server模塊和api模塊呼叫 ;

  • dolphinscheduler-ui 前端模塊;

2.1.2 組態檔

dolphinscheduler-common common.properties

#本地作業目錄,用于存放臨時檔案
data.basedir.path=/tmp/dolphinscheduler
#資源檔案存盤型別: HDFS,S3,NONE
resource.storage.type=NONE
#資源檔案存盤路徑
resource.upload.path=/dolphinscheduler
#hadoop是否開啟kerberos權限
hadoop.security.authentication.startup.state=false
#kerberos配置目錄
java.security.krb5.conf.path=/opt/krb5.conf
#kerberos登錄用戶
[email protected]

#kerberos登錄用戶keytab
login.user.keytab.path=/opt/hdfs.headless.keytab

#kerberos過期時間,整數,單位為小時
kerberos.expire.time=2
#	如果存盤型別為HDFS,需要配置擁有對應操作權限的用戶
hdfs.root.user=hdfs
#請求地址如果resource.storage.type=S3,該值類似為: s3a://dolphinscheduler. 如果resource.storage.type=HDFS, 如果 hadoop 配置了 HA,需要復制core-site.xml 和 hdfs-site.xml 檔案到conf目錄
fs.defaultFS=hdfs://mycluster:8020
aws.access.key.id=minioadmin
aws.secret.access.key=minioadmin
aws.region=us-east-1
aws.endpoint=http://localhost:9000
# resourcemanager port, the default value is 8088 if not specified
resource.manager.httpaddress.port=8088
#yarn resourcemanager 地址, 如果resourcemanager開啟了HA, 輸入HA的IP地址(以逗號分隔),如果resourcemanager為單節點, 該值為空即可
yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
#如果resourcemanager開啟了HA或者沒有使用resourcemanager,保持默認值即可. 如果resourcemanager為單節點,你需要將ds1 配置為resourcemanager對應的hostname
yarn.application.status.address=http://ds1:%s/ws/v1/cluster/apps/%s
# job history status url when application number threshold is reached(default 10000, maybe it was set to 1000)
yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s

# datasource encryption enable
datasource.encryption.enable=false

# datasource encryption salt
datasource.encryption.salt=!@#$%^&*

# data quality option
data-quality.jar.name=dolphinscheduler-data-quality-dev-SNAPSHOT.jar

#data-quality.error.output.path=/tmp/data-quality-error-data

# Network IP gets priority, default inner outer

# Whether hive SQL is executed in the same session
support.hive.oneSession=false

# use sudo or not, if set true, executing user is tenant user and deploy user needs sudo permissions; if set false, executing user is the deploy user and doesn't need sudo permissions
sudo.enable=true

# network interface preferred like eth0, default: empty
#dolphin.scheduler.network.interface.preferred=

# network IP gets priority, default: inner outer
#dolphin.scheduler.network.priority.strategy=default

# system env path
#dolphinscheduler.env.path=dolphinscheduler_env.sh

#是否處于開發模式
development.state=false

# rpc port
alert.rpc.port=50052

# Url endpoint for zeppelin RESTful API
zeppelin.rest.url=http://localhost:8080


dolphinscheduler-api application.yaml

server:
  port: 12345
  servlet:
    session:
      timeout: 120m
    context-path: /dolphinscheduler/
  compression:
    enabled: true
    mime-types: text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xml
  jetty:
    max-http-form-post-size: 5000000

spring:
  application:
    name: api-server
  banner:
    charset: UTF-8
  jackson:
    time-zone: UTC
    date-format: "yyyy-MM-dd HH:mm:ss"
  servlet:
    multipart:
      max-file-size: 1024MB
      max-request-size: 1024MB
  messages:
    basename: i18n/messages
  datasource:
#    driver-class-name: org.postgresql.Driver
#    url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
    username: root
    password: root
    hikari:
      connection-test-query: select 1
      minimum-idle: 5
      auto-commit: true
      validation-timeout: 3000
      pool-name: DolphinScheduler
      maximum-pool-size: 50
      connection-timeout: 30000
      idle-timeout: 600000
      leak-detection-threshold: 0
      initialization-fail-timeout: 1
  quartz:
    auto-startup: false
    job-store-type: jdbc
    jdbc:
      initialize-schema: never
    properties:
      org.quartz.threadPool:threadPriority: 5
      org.quartz.jobStore.isClustered: true
      org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
      org.quartz.scheduler.instanceId: AUTO
      org.quartz.jobStore.tablePrefix: QRTZ_
      org.quartz.jobStore.acquireTriggersWithinLock: true
      org.quartz.scheduler.instanceName: DolphinScheduler
      org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
      org.quartz.jobStore.useProperties: false
      org.quartz.threadPool.makeThreadsDaemons: true
      org.quartz.threadPool.threadCount: 25
      org.quartz.jobStore.misfireThreshold: 60000
      org.quartz.scheduler.makeSchedulerThreadDaemon: true
#      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
      org.quartz.jobStore.clusterCheckinInterval: 5000

management:
  endpoints:
    web:
      exposure:
        include: '*'
  metrics:
    tags:
      application: ${spring.application.name}

registry:
  type: zookeeper
  zookeeper:
    namespace: dolphinscheduler
#    connect-string: localhost:2181
    connect-string: 10.255.158.70:2181
    retry-policy:
      base-sleep-time: 60ms
      max-sleep: 300ms
      max-retries: 5
    session-timeout: 30s
    connection-timeout: 9s
    block-until-connected: 600ms
    digest: ~

audit:
  enabled: false

metrics:
  enabled: true

python-gateway:
  # Weather enable python gateway server or not. The default value is true.
  enabled: true
  # The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different
  # between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`
  gateway-server-address: 0.0.0.0
  # The port of Python gateway server start. Define which port you could connect to Python gateway server from
  # Python API side.
  gateway-server-port: 25333
  # The address of Python callback client.
  python-address: 127.0.0.1
  # The port of Python callback client.
  python-port: 25334
  # Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite),
  # and socket server would never close even though no requests accept
  connect-timeout: 0
  # Close each active connection of socket server if python program not active after x milliseconds. Define value is
  # (0 = infinite), and socket server would never close even though no requests accept
  read-timeout: 0

# Override by profile

---
spring:
  config:
    activate:
      on-profile: mysql
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
  quartz:
    properties:
      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate


?

dolphinscheduler-master application.yaml

spring:
  banner:
    charset: UTF-8
  application:
    name: master-server
  jackson:
    time-zone: UTC
    date-format: "yyyy-MM-dd HH:mm:ss"
  cache:
    # default enable cache, you can disable by `type: none`
    type: none
    cache-names:
      - tenant
      - user
      - processDefinition
      - processTaskRelation
      - taskDefinition
    caffeine:
      spec: maximumSize=100,expireAfterWrite=300s,recordStats
  datasource:
    #driver-class-name: org.postgresql.Driver
    #url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
    username: root
    password:
    hikari:
      connection-test-query: select 1
      minimum-idle: 5
      auto-commit: true
      validation-timeout: 3000
      pool-name: DolphinScheduler
      maximum-pool-size: 50
      connection-timeout: 30000
      idle-timeout: 600000
      leak-detection-threshold: 0
      initialization-fail-timeout: 1
  quartz:
    job-store-type: jdbc
    jdbc:
      initialize-schema: never
    properties:
      org.quartz.threadPool:threadPriority: 5
      org.quartz.jobStore.isClustered: true
      org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
      org.quartz.scheduler.instanceId: AUTO
      org.quartz.jobStore.tablePrefix: QRTZ_
      org.quartz.jobStore.acquireTriggersWithinLock: true
      org.quartz.scheduler.instanceName: DolphinScheduler
      org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
      org.quartz.jobStore.useProperties: false
      org.quartz.threadPool.makeThreadsDaemons: true
      org.quartz.threadPool.threadCount: 25
      org.quartz.jobStore.misfireThreshold: 60000
      org.quartz.scheduler.makeSchedulerThreadDaemon: true
#      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
      org.quartz.jobStore.clusterCheckinInterval: 5000

registry:
  type: zookeeper
  zookeeper:
    namespace: dolphinscheduler
#    connect-string: localhost:2181
    connect-string: 10.255.158.70:2181
    retry-policy:
      base-sleep-time: 60ms
      max-sleep: 300ms
      max-retries: 5
    session-timeout: 30s
    connection-timeout: 9s
    block-until-connected: 600ms
    digest: ~

master:
  listen-port: 5678
  # master fetch command num
  fetch-command-num: 10
  # master prepare execute thread number to limit handle commands in parallel
  pre-exec-threads: 10
  # master execute thread number to limit process instances in parallel
  exec-threads: 100
  # master dispatch task number per batch
  dispatch-task-number: 3
  # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
  host-selector: lower_weight
  # master heartbeat interval, the unit is second
  heartbeat-interval: 10
  # master commit task retry times
  task-commit-retry-times: 5
  # master commit task interval, the unit is millisecond
  task-commit-interval: 1000
  state-wheel-interval: 5
  # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
  max-cpu-load-avg: -1
  # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
  reserved-memory: 0.3
  # failover interval, the unit is minute
  failover-interval: 10
  # kill yarn jon when failover taskInstance, default true
  kill-yarn-job-when-task-failover: true

server:
  port: 5679

management:
  endpoints:
    web:
      exposure:
        include: '*'
  metrics:
    tags:
      application: ${spring.application.name}

metrics:
  enabled: true

# Override by profile

---
spring:
  config:
    activate:
      on-profile: mysql
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
  quartz:
    properties:
      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate

dolphinscheduler-worker application.yaml

spring:
  banner:
    charset: UTF-8
  application:
    name: worker-server
  jackson:
    time-zone: UTC
    date-format: "yyyy-MM-dd HH:mm:ss"
  datasource:
    #driver-class-name: org.postgresql.Driver
    #url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
    username: root
    #password: root
    password:
    hikari:
      connection-test-query: select 1
      minimum-idle: 5
      auto-commit: true
      validation-timeout: 3000
      pool-name: DolphinScheduler
      maximum-pool-size: 50
      connection-timeout: 30000
      idle-timeout: 600000
      leak-detection-threshold: 0
      initialization-fail-timeout: 1

registry:
  type: zookeeper
  zookeeper:
    namespace: dolphinscheduler
#    connect-string: localhost:2181
    connect-string: 10.255.158.70:2181
    retry-policy:
      base-sleep-time: 60ms
      max-sleep: 300ms
      max-retries: 5
    session-timeout: 30s
    connection-timeout: 9s
    block-until-connected: 600ms
    digest: ~

worker:
  # worker listener port
  listen-port: 1234
  # worker execute thread number to limit task instances in parallel
  exec-threads: 100
  # worker heartbeat interval, the unit is second
  heartbeat-interval: 10
  # worker host weight to dispatch tasks, default value 100
  host-weight: 100
  # worker tenant auto create
  tenant-auto-create: true
  # worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value -1: the number of cpu cores * 2
  max-cpu-load-avg: -1
  # worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G
  reserved-memory: 0.3
  # default worker groups separated by comma, like 'worker.groups=default,test'
  groups:
    - default
  # alert server listen host
  alert-listen-host: localhost
  alert-listen-port: 50052

server:
  port: 1235

management:
  endpoints:
    web:
      exposure:
        include: '*'
  metrics:
    tags:
      application: ${spring.application.name}

metrics:
  enabled: true

主要關注資料庫,quartz, zookeeper, masker, worker配置,

2.2 API主要任務操作介面

其他業務介面可以不用關注,只需要關注最最主要的流程上線功能介面,此介面可以發散出所有的任務調度相關的代碼,

介面:/dolphinscheduler/projects/{projectCode}/schedules/{id}/online;此介面會將定義的流程提交到Quartz調度框架;代碼如下:

public Map<String, Object> setScheduleState(User loginUser,                                                 long projectCode,                                                 Integer id,                                                 ReleaseState scheduleStatus) {         Map<String, Object> result = new HashMap<>();

Project project = projectMapper.queryByCode(projectCode);         // check project auth         boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);         if (!hasProjectAndPerm) {             return result;         }

// check schedule exists         Schedule scheduleObj = scheduleMapper.selectById(id);

if (scheduleObj == null) {             putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);             return result;         }         // check schedule release state         if (scheduleObj.getReleaseState() == scheduleStatus) {             logger.info("schedule release is already {},needn't to change schedule id: {} from {} to {}",                     scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus);             putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);             return result;         }         ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());         if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode()));             return result;         }         List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());         if (processTaskRelations.isEmpty()) {             putMsg(result, Status.PROCESS_DAG_IS_EMPTY);             return result;         }         if (scheduleStatus == ReleaseState.ONLINE) {             // check process definition release state             if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {                 logger.info("not release process definition id: {} , name : {}",                         processDefinition.getId(), processDefinition.getName());                 putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());                 return result;             }             // check sub process definition release state             List<Long> subProcessDefineCodes = new ArrayList<>();             processService.recurseFindSubProcess(processDefinition.getCode(), subProcessDefineCodes);             if (!subProcessDefineCodes.isEmpty()) {                 List<ProcessDefinition> subProcessDefinitionList =                         processDefinitionMapper.queryByCodes(subProcessDefineCodes);                 if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) {                     for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) {                         /**                          * if there is no online process, exit directly                          */                         if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {                             logger.info("not release process definition id: {} , name : {}",                                     subProcessDefinition.getId(), subProcessDefinition.getName());                             putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(subProcessDefinition.getId()));                             return result;                         }                     }                 }             }         }

// check master server exists         List<Server> masterServers = monitorService.getServerListFromRegistry(true);

if (masterServers.isEmpty()) {             putMsg(result, Status.MASTER_NOT_EXISTS);             return result;         }

// set status         scheduleObj.setReleaseState(scheduleStatus);

scheduleMapper.updateById(scheduleObj);

try {             switch (scheduleStatus) {                 case ONLINE:                     logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);                     setSchedule(project.getId(), scheduleObj);                     break;                 case OFFLINE:                     logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);                     deleteSchedule(project.getId(), id);                     break;                 default:                     putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());                     return result;             }         } catch (Exception e) {             result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");             throw new ServiceException(result.get(Constants.MSG).toString(), e);         }

putMsg(result, Status.SUCCESS);         return result;     }

public void setSchedule(int projectId, Schedule schedule) {
        logger.info("set schedule, project id: {}, scheduleId: {}", projectId, schedule.getId());

        quartzExecutor.addJob(ProcessScheduleJob.class, projectId, schedule);
    }

public void addJob(Class<? extends Job> clazz, int projectId, final Schedule schedule) {
        String jobName = this.buildJobName(schedule.getId());
        String jobGroupName = this.buildJobGroupName(projectId);

        Map<String, Object> jobDataMap = this.buildDataMap(projectId, schedule);
        String cronExpression = schedule.getCrontab();
        String timezoneId = schedule.getTimezoneId();

        /**
         * transform from server default timezone to schedule timezone
         * e.g. server default timezone is `UTC`
         * user set a schedule with startTime `2022-04-28 10:00:00`, timezone is `Asia/Shanghai`,
         * api skip to transform it and save into databases directly, startTime `2022-04-28 10:00:00`, timezone is `UTC`, which actually added 8 hours,
         * so when add job to quartz, it should recover by transform timezone
         */
        Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), timezoneId);
        Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), timezoneId);

        lock.writeLock().lock();
        try {

            JobKey jobKey = new JobKey(jobName, jobGroupName);
            JobDetail jobDetail;
            //add a task (if this task already exists, return this task directly)
            if (scheduler.checkExists(jobKey)) {

                jobDetail = scheduler.getJobDetail(jobKey);
                jobDetail.getJobDataMap().putAll(jobDataMap);
            } else {
                jobDetail = newJob(clazz).withIdentity(jobKey).build();

                jobDetail.getJobDataMap().putAll(jobDataMap);

                scheduler.addJob(jobDetail, false, true);

                logger.info("Add job, job name: {}, group name: {}",
                        jobName, jobGroupName);
            }

            TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName);
            /*
             * Instructs the Scheduler that upon a mis-fire
             * situation, the CronTrigger wants to have it's
             * next-fire-time updated to the next time in the schedule after the
             * current time (taking into account any associated Calendar),
             * but it does not want to be fired now.
             */
            CronTrigger cronTrigger = newTrigger()
                    .withIdentity(triggerKey)
                    .startAt(startDate)
                    .endAt(endDate)
                    .withSchedule(
                            cronSchedule(cronExpression)
                                    .withMisfireHandlingInstructionDoNothing()
                                    .inTimeZone(DateUtils.getTimezone(timezoneId))
                    )
                    .forJob(jobDetail).build();

            if (scheduler.checkExists(triggerKey)) {
                // updateProcessInstance scheduler trigger when scheduler cycle changes
                CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
                String oldCronExpression = oldCronTrigger.getCronExpression();

                if (!StringUtils.equalsIgnoreCase(cronExpression, oldCronExpression)) {
                    // reschedule job trigger
                    scheduler.rescheduleJob(triggerKey, cronTrigger);
                    logger.info("reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
                            jobName, jobGroupName, cronExpression, startDate, endDate);
                }
            } else {
                scheduler.scheduleJob(cronTrigger);
                logger.info("schedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
                        jobName, jobGroupName, cronExpression, startDate, endDate);
            }

        } catch (Exception e) {
            throw new ServiceException("add job failed", e);
        } finally {
            lock.writeLock().unlock();
        }
    }

2.3 Quaterz架構與運行流程

2.3.1 概念與架構

Quartz 框架主要包括如下幾個部分:

  • SchedulerFactory:任務調度工廠,主要負責管理任務調度器;

  • Scheduler :任務調度器,主要負責任務調度,以及操作任務的相關介面;

  • Job :任務介面,實作類包含具體任務業務代碼;

  • JobDetail:用于定義作業的實體;

  • Trigger:任務觸發器,主要存放 Job 執行的時間策略,例如多久執行一次,什么時候執行,以什么頻率執行等等;

  • JobBuilder :用于定義/構建 JobDetail 實體,用于定義作業的實體,

  • TriggerBuilder :用于定義/構建觸發器實體;

  • Calendar:Trigger 擴展物件,可以排除或者包含某個指定的時間點(如排除法定節假日);

  • JobStore:存盤作業和任務調度期間的狀態Scheduler的生命期,從 SchedulerFactory 創建它時開始,到 Scheduler 呼叫Shutdown() 方法時結束;

Scheduler 被創建后,可以增加、洗掉和列舉 Job 和 Trigger,以及執行其它與調度相關的操作(如暫停 Trigger),但Scheduler 只有在呼叫 start() 方法后,才會真正地觸發 trigger(即執行 job)

2.3.2 初始化與執行流程

Quartz的基本原理就是通過Scheduler來調度被JobDetail和Trigger定義的安裝Job介面規范實作的自定義任務業務物件,來完成任務的調度,基本邏輯如下圖:

?

代碼時序圖如下:

?

基本內容就是初始化任務調度容器Scheduler,以及容器所需的執行緒池,資料互動物件JobStore,任務處理執行緒QuartzSchedulerThread用來處理Job介面的具體業務實作類,

DolphinScheduler的業務類是ProcessScheduleJob,主要功能就是根據調度資訊往commond表中寫資料,

2.3.3 集群運轉

需要注意的事:

  1. 當Quartz采用集群形式部署的時候,存盤介質不能使用記憶體的形式,也就是不能使用JobStoreRAM,

  2. Quartz集群對于對于需要被調度的Triggers實體的掃描是使用資料庫鎖TRIGGER_ACCESS來完成的,保障此掃描程序只能被一個Quartz實體獲取到,代碼如下:

public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)         throws JobPersistenceException {                  String lockName;         if(isAcquireTriggersWithinLock() || maxCount > 1) {              lockName = LOCK_TRIGGER_ACCESS;         } else {             lockName = null;         }         return executeInNonManagedTXLock(lockName,                  new TransactionCallback<List<OperableTrigger>>() {                     public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {                         return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);                     }                 },                 new TransactionValidator<List<OperableTrigger>>() {                     public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {                         try {                             List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());                             Set<String> fireInstanceIds = new HashSet<String>();                             for (FiredTriggerRecord ft : acquired) {                                 fireInstanceIds.add(ft.getFireInstanceId());                             }                             for (OperableTrigger tr : result) {                                 if (fireInstanceIds.contains(tr.getFireInstanceId())) {                                     return true;                                 }                             }                             return false;                         } catch (SQLException e) {                             throw new JobPersistenceException("error validating trigger acquisition", e);                         }                     }                 });     }

?

3.集群失敗實體恢復需要注意的是各個實體恢復各自實體對應的例外實體,因為資料庫有調度容器的instanceId資訊,代碼如下:

 protected void clusterRecover(Connection conn, List<SchedulerStateRecord> failedInstances)
        throws JobPersistenceException {

        if (failedInstances.size() > 0) {

            long recoverIds = System.currentTimeMillis();

            logWarnIfNonZero(failedInstances.size(),
                    "ClusterManager: detected " + failedInstances.size()
                            + " failed or restarted instances.");
            try {
                for (SchedulerStateRecord rec : failedInstances) {
                    getLog().info(
                            "ClusterManager: Scanning for instance \""
                                    + rec.getSchedulerInstanceId()
                                    + "\"'s failed in-progress jobs.");

                    List<FiredTriggerRecord> firedTriggerRecs = getDelegate()
                            .selectInstancesFiredTriggerRecords(conn,
                                    rec.getSchedulerInstanceId());

                    int acquiredCount = 0;
                    int recoveredCount = 0;
                    int otherCount = 0;

                    Set<TriggerKey> triggerKeys = new HashSet<TriggerKey>();

                    for (FiredTriggerRecord ftRec : firedTriggerRecs) {

                        TriggerKey tKey = ftRec.getTriggerKey();
                        JobKey jKey = ftRec.getJobKey();

                        triggerKeys.add(tKey);

                        // release blocked triggers..
                        if (ftRec.getFireInstanceState().equals(STATE_BLOCKED)) {
                            getDelegate()
                                    .updateTriggerStatesForJobFromOtherState(
                                            conn, jKey,
                                            STATE_WAITING, STATE_BLOCKED);
                        } else if (ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)) {
                            getDelegate()
                                    .updateTriggerStatesForJobFromOtherState(
                                            conn, jKey,
                                            STATE_PAUSED, STATE_PAUSED_BLOCKED);
                        }

                        // release acquired triggers..
                        if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) {
                            getDelegate().updateTriggerStateFromOtherState(
                                    conn, tKey, STATE_WAITING,
                                    STATE_ACQUIRED);
                            acquiredCount++;
                        } else if (ftRec.isJobRequestsRecovery()) {
                            // handle jobs marked for recovery that were not fully
                            // executed..
                            if (jobExists(conn, jKey)) {
                                @SuppressWarnings("deprecation")
                                SimpleTriggerImpl rcvryTrig = new SimpleTriggerImpl(
                                        "recover_"
                                                + rec.getSchedulerInstanceId()
                                                + "_"
                                                + String.valueOf(recoverIds++),
                                        Scheduler.DEFAULT_RECOVERY_GROUP,
                                        new Date(ftRec.getScheduleTimestamp()));
                                rcvryTrig.setJobName(jKey.getName());
                                rcvryTrig.setJobGroup(jKey.getGroup());
                                rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY);
                                rcvryTrig.setPriority(ftRec.getPriority());
                                JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup());
                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName());
                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup());
                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp()));
                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_SCHEDULED_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getScheduleTimestamp()));
                                rcvryTrig.setJobDataMap(jd);

                                rcvryTrig.computeFirstFireTime(null);
                                storeTrigger(conn, rcvryTrig, null, false,
                                        STATE_WAITING, false, true);
                                recoveredCount++;
                            } else {
                                getLog()
                                        .warn(
                                                "ClusterManager: failed job '"
                                                        + jKey
                                                        + "' no longer exists, cannot schedule recovery.");
                                otherCount++;
                            }
                        } else {
                            otherCount++;
                        }

                        // free up stateful job's triggers
                        if (ftRec.isJobDisallowsConcurrentExecution()) {
                            getDelegate()
                                    .updateTriggerStatesForJobFromOtherState(
                                            conn, jKey,
                                            STATE_WAITING, STATE_BLOCKED);
                            getDelegate()
                                    .updateTriggerStatesForJobFromOtherState(
                                            conn, jKey,
                                            STATE_PAUSED, STATE_PAUSED_BLOCKED);
                        }
                    }

                    getDelegate().deleteFiredTriggers(conn,
                            rec.getSchedulerInstanceId());

                    // Check if any of the fired triggers we just deleted were the last fired trigger
                    // records of a COMPLETE trigger.
                    int completeCount = 0;
                    for (TriggerKey triggerKey : triggerKeys) {

                        if (getDelegate().selectTriggerState(conn, triggerKey).
                                equals(STATE_COMPLETE)) {
                            List<FiredTriggerRecord> firedTriggers =
                                    getDelegate().selectFiredTriggerRecords(conn, triggerKey.getName(), triggerKey.getGroup());
                            if (firedTriggers.isEmpty()) {

                                if (removeTrigger(conn, triggerKey)) {
                                    completeCount++;
                                }
                            }
                        }
                    }

                    logWarnIfNonZero(acquiredCount,
                            "ClusterManager: ......Freed " + acquiredCount
                                    + " acquired trigger(s).");
                    logWarnIfNonZero(completeCount,
                            "ClusterManager: ......Deleted " + completeCount
                                    + " complete triggers(s).");
                    logWarnIfNonZero(recoveredCount,
                            "ClusterManager: ......Scheduled " + recoveredCount
                                    + " recoverable job(s) for recovery.");
                    logWarnIfNonZero(otherCount,
                            "ClusterManager: ......Cleaned-up " + otherCount
                                    + " other failed job(s).");

                    if (!rec.getSchedulerInstanceId().equals(getInstanceId())) {
                        getDelegate().deleteSchedulerState(conn,
                                rec.getSchedulerInstanceId());
                    }
                }
            } catch (Throwable e) {
                throw new JobPersistenceException("Failure recovering jobs: "
                        + e.getMessage(), e);
            }
        }
    }

2.4 Master啟動與執行流程

?

2.4.1 概念與執行邏輯

關鍵概念:

Quartz相關:

  • Scheduler(任務調度容器,一般都是StdScheduler實體),

  • ProcessScheduleJob:(實作Quarts調度框架的Job介面的業務類,專門生成DolphinScheduler資料庫業務表t_ds_commond資料);

DolphinScheduler相關:

  • NettyRemotingServer(netty服務端,包含netty服務端serverBootstrap物件與netty服務端業務處理物件serverHandler), NettyServerHandler:(netty服務端業務處理類:包含各類處理器以及處理器對應的執行執行緒池);

  • TaskPluginManager(任務插件管理器,不同型別的任務以插件的形式管理,在應用服務啟動的時候,通過@AutoService加載實作了TaskChannelFactory介面的工廠資訊到資料庫,通過工廠物件來加載各類TaskChannel實作類到快取);

  • MasterRegistryClient(master操作zk的客戶端,封裝了master對于zk的所有操作,注冊,查詢,洗掉等);

  • MasterSchedulerService(掃描服務,包含業務執行執行緒和work包含的nettyhe護短,負責任務調度業務,slot來控制集群模式下任務不被重復調度,底層實作是zookeeper分布式鎖);

  • WorkflowExecuteThread(真正的業務處理執行緒,通過插槽獲取命令commond,執行之前會校驗slot的變化,如果變化不執行,關鍵功能就是構建任務相關的引數,定義,優先級等,然后發送到佇列,供佇列處理執行緒消費);

  • CommonTaskProcessor(普通任務處理器,實作ITaskProcessor介面,根據業務分為普通,依賴,子任務,阻塞,條件任務型別,包含了任務的提交,運行,分發,殺死等業務,通過@AutoService加載的類,根本就是封裝了對);

  • TaskPriorityQueueImpl(任務佇列,負責任務佇列的存盤控制);

  • TaskPriorityQueueConsumer(任務佇列消費執行緒,負責任務的根據負載均衡策略在worker之間分發與執行);

  • ServerNodeManager (節點資訊控制器,負責節點注冊資訊更新與槽位(slot)變更,底層實作是zookeeper分布式鎖的應用);

  • EventExecuteService(事件處理執行緒,通過快取起來的任務處理執行緒,處理每個任務在處理程序中注冊在執行緒事件佇列中的事件);

  • FailoverExecuteThread(故障轉移執行緒,包含Master和worker的);

  • MasterRegistryDataListener(托管在zk管理框架cautor的故障監聽器,負責對worker和master注冊在zk上的節點的新增和洗掉),

主節點容錯代碼如下,業務解釋見1.5.1Master容錯解釋:

 private void failoverMasterWithLock(String masterHost) {
        String failoverPath = getFailoverLockPath(NodeType.MASTER, masterHost);
        try {
            registryClient.getLock(failoverPath);
            this.failoverMaster(masterHost);
        } catch (Exception e) {
            LOGGER.error("{} server failover failed, host:{}", NodeType.MASTER, masterHost, e);
        } finally {
            registryClient.releaseLock(failoverPath);
        }
    }
 /**
     * failover master
     * <p>
     * failover process instance and associated task instance
     *故障轉移流程實體和關聯的任務實體
     * @param masterHost master host
     */
    private void failoverMaster(String masterHost) {
        if (StringUtils.isEmpty(masterHost)) {
            return;
        }
        Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost);
        long startTime = System.currentTimeMillis();
        List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
        LOGGER.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size());
        List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
        for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
            if (Constants.NULL.equals(processInstance.getHost())) {
                continue;
            }

            List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
            for (TaskInstance taskInstance : validTaskInstanceList) {
                LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
                failoverTaskInstance(processInstance, taskInstance, workerServers);
            }

            if (serverStartupTime != null && processInstance.getRestartTime() != null
                && processInstance.getRestartTime().after(serverStartupTime)) {
                continue;
            }

            LOGGER.info("failover process instance id: {}", processInstance.getId());
            //updateProcessInstance host is null and insert into command
            processInstance.setHost(Constants.NULL);
            processService.processNeedFailoverProcessInstances(processInstance);
        }

        LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime);
    }

2.4.2 集群與槽(slot)

其實這里的采用Zookeer分布式鎖準確也不準確,為什么這么說,因為Slot是CommondId對Master串列長度取模來計算的,而Master串列長度的重繪是Zookeeper分布式鎖來控制,Master節點的調度資料掃描是通過Slot來控制的,

具體代碼如下:

Slot重繪

private void updateMasterNodes() {
        MASTER_SLOT = 0;
        MASTER_SIZE = 0;
        this.masterNodes.clear();
        String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;
        try {
            registryClient.getLock(nodeLock);
            Collection<String> currentNodes = registryClient.getMasterNodesDirectly();
            List<Server> masterNodes = registryClient.getServerList(NodeType.MASTER);
            syncMasterNodes(currentNodes, masterNodes);
        } catch (Exception e) {
            logger.error("update master nodes error", e);
        } finally {
            registryClient.releaseLock(nodeLock);
        }

    }
/**
     * sync master nodes
     *
     * @param nodes master nodes
     */
    private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {
        masterLock.lock();
        try {
            String addr = NetUtils.getAddr(NetUtils.getHost(), masterConfig.getListenPort());
            this.masterNodes.addAll(nodes);
            this.masterPriorityQueue.clear();
            this.masterPriorityQueue.putList(masterNodes);
            int index = masterPriorityQueue.getIndex(addr);
            if (index >= 0) {
                MASTER_SIZE = nodes.size();
                MASTER_SLOT = index;
            } else {
                logger.warn("current addr:{} is not in active master list", addr);
            }
            logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, addr);
        } finally {
            masterLock.unlock();
        }
    }

Slot應用

/**
     * 1. get command by slot
     * 2. donot handle command if slot is empty
     */
    /** * 1. 通過插槽獲取命令 * 2. 如果插槽為空,則不處理命令 */
    private void scheduleProcess() throws Exception {
        List<Command> commands = findCommands();
        if (CollectionUtils.isEmpty(commands)) {
            //indicate that no command ,sleep for 1s
            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
            return;
        }

        List<ProcessInstance> processInstances = command2ProcessInstance(commands);
        if (CollectionUtils.isEmpty(processInstances)) {
            return;
        }

        for (ProcessInstance processInstance : processInstances) {
            if (processInstance == null) {
                continue;
            }

            WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread(
                    processInstance
                    , processService
                    , nettyExecutorManager
                    , processAlertManager
                    , masterConfig
                    , stateWheelExecuteThread);

            this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread);
            if (processInstance.getTimeout() > 0) {
                stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
            }
            workflowExecuteThreadPool.startWorkflow(workflowExecuteThread);
        }
    }
private List<Command> findCommands() {
        int pageNumber = 0;
        int pageSize = masterConfig.getFetchCommandNum();
        List<Command> result = new ArrayList<>();
        if (Stopper.isRunning()) {
            int thisMasterSlot = ServerNodeManager.getSlot();
            int masterCount = ServerNodeManager.getMasterSize();
            if (masterCount > 0) {
                result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
            }
        }
        return result;
    }
@Override
    public List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) {
        if (masterCount <= 0) {
            return Lists.newArrayList();
        }
        return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot);
    }
    
 <select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">
        select *
        from t_ds_command
        where id % #{masterCount} = #{thisMasterSlot}
        order by process_instance_priority, id asc
            limit #{limit} offset #{offset}
    </select>

##槽位檢查
 private List<ProcessInstance> command2ProcessInstance(List<Command> commands) {
        List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));
        CountDownLatch latch = new CountDownLatch(commands.size());
        for (final Command command : commands) {
            masterPrepareExecService.execute(() -> {
                try {
                    // slot check again
                    SlotCheckState slotCheckState = slotCheck(command);
                    if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) {
                        logger.info("handle command {} skip, slot check state: {}", command.getId(), slotCheckState);
                        return;
                    }
                    ProcessInstance processInstance = processService.handleCommand(logger,
                            getLocalAddress(),
                            command);
                    if (processInstance != null) {
                        processInstances.add(processInstance);
                        logger.info("handle command {} end, create process instance {}", command.getId(), processInstance.getId());
                    }
                } catch (Exception e) {
                    logger.error("handle command error ", e);
                    processService.moveToErrorCommand(command, e.toString());
                } finally {
                    latch.countDown();
                }
            });
        }

        try {
            // make sure to finish handling command each time before next scan
            latch.await();
        } catch (InterruptedException e) {
            logger.error("countDownLatch await error ", e);
        }

        return processInstances;
    }

private SlotCheckState slotCheck(Command command) {
        int slot = ServerNodeManager.getSlot();
        int masterSize = ServerNodeManager.getMasterSize();
        SlotCheckState state;
        if (masterSize <= 0) {
            state = SlotCheckState.CHANGE;
        } else if (command.getId() % masterSize == slot) {
            state = SlotCheckState.PASS;
        } else {
            state = SlotCheckState.INJECT;
        }
        return state;
    }

?2.4.3 代碼執行流程

?

代碼過于繁瑣,此處不再一一粘貼代碼解釋各個類的功能,自行看代碼更加清晰,

2.5Worker啟動與執行流程

2.5.1 概念與執行邏輯

  • NettyRemotingServer(worker包含的netty服務端) WorkerRegistryClient(zk客戶端,封裝了worker與zk相關的操作,注冊,查詢,洗掉等) ;

  • TaskPluginManager(任務插件管理器,封裝了插件加載邏輯和任務實際執行業務的抽象) ;

  • WorkerManagerThread(任務作業執行緒生成器,消費netty處理器推進佇列的任務資訊,并生成任務執行執行緒提交執行緒池管理) ;

  • TaskExecuteProcessor(Netty任務執行處理器,生成master分發到work的任務資訊,并推送到佇列) ;

  • TaskExecuteThread(任務執行執行緒) ;

  • TaskCallbackService(任務回呼執行緒,與master包含的netty client通信);

  • AbstractTask(任務實際業務的抽象類,子類包含實際的任務執行業務,SqlTask,DataXTask等) ;

  • RetryReportTaskStatusThread(不關注)

2.5.2 代碼執行流程

Worker節點代碼時序圖如下:

代碼過于繁瑣,此處不再一一粘貼代碼解釋各個類的功能,自行看代碼更加清晰,

2.6 RPC互動

因為節點和應用服務之間的RPC通信都是基于Netty實作的,Netty相關知識不在這里過多的講解,當前章節只涉及Master與Worker之間的互動模式的設計與實作,

整體設計如下

2.6.1 Master與Worker互動

Master與worker之間的業務邏輯的互動是基于Netty服務端與客戶端來實作Rpc通信的,Master和Worker啟動的時候會將自己的Netty服務端資訊注冊到ZK相應的節點上,Master的任務分發執行緒和任務殺死等業務運行時,拉取ZK上的Worker節點資訊,根據負載均衡策略選擇一個節點(下章介紹負載均衡),構建Netty客戶端與Worker的Netty服務端通信,Worker收到Master的RPC請求之后會快取Channel資訊并處理對應業務,同時Callback回呼執行緒會獲取快取的通道來執行回呼操作,這樣就形成的倍訓,

任務的執行殺死,以及回呼狀態處理等操作都是通過Netty客戶端與服務端系結的Processer處理器來進行的,

Master部分具體代碼如下:

Master啟動的時候會初始化Nettyserver,注冊對應的請求處理器到NettyHandler并啟動:

 @PostConstruct
    public void run() throws SchedulerException {
        // init remoting server
        NettyServerConfig serverConfig = new NettyServerConfig();
        serverConfig.setListenPort(masterConfig.getListenPort());
        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);

        // logger server
        this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);

        this.nettyRemotingServer.start();

        // install task plugin
        this.taskPluginManager.installPlugin();

        // self tolerant
        this.masterRegistryClient.init();
        this.masterRegistryClient.start();
        this.masterRegistryClient.setRegistryStoppable(this);

        this.masterSchedulerService.init();
        this.masterSchedulerService.start();

        this.eventExecuteService.start();
        this.failoverExecuteThread.start();

        this.scheduler.start();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (Stopper.isRunning()) {
                close("shutdownHook");
            }
        }));
    }
 /**
     * server start
     */
    public void start() {
        if (isStarted.compareAndSet(false, true)) {
            this.serverBootstrap
                    .group(this.bossGroup, this.workGroup)
                    .channel(NettyUtils.getServerSocketChannelClass())
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
                    .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
                    .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
                    .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
                    .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) {
                            initNettyChannel(ch);
                        }
                    });

            ChannelFuture future;
            try {
                future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
            } catch (Exception e) {
                logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);
                throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
            }
            if (future.isSuccess()) {
                logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
            } else if (future.cause() != null) {
                throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()), future.cause());
            } else {
                throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
            }
        }
    }

?Master的NettyExecutorManager初始化的時候會將NettyRemotingClient也初始化,并且會注冊處理Worker回呼請求的處理器,真正的埠系結是在獲取到執行器埠之后:

 /**
     * constructor
     */
    public NettyExecutorManager() {
        final NettyClientConfig clientConfig = new NettyClientConfig();
        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
    }
##注冊處理worker回呼的處理器
    @PostConstruct
    public void init() {
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
    }
    
 public NettyRemotingClient(final NettyClientConfig clientConfig) {
        this.clientConfig = clientConfig;
        if (NettyUtils.useEpoll()) {
            this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
                private final AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
                private final AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
                }
            });
        }
        this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
                new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
                new CallerThreadExecutePolicy());
        this.clientHandler = new NettyClientHandler(this, callbackExecutor);

        this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));

        this.start();
    }
 /**
     * start
     */
 private void start() {

        this.bootstrap
                .group(this.workerGroup)
                .channel(NettyUtils.getSocketChannelClass())
                .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())
                .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())
                .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
                .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) {
                        ch.pipeline()
                                .addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS))
                                .addLast(new NettyDecoder(), clientHandler, encoder);
                    }
                });
        this.responseFutureExecutor.scheduleAtFixedRate(ResponseFuture::scanFutureTable, 5000, 1000, TimeUnit.MILLISECONDS);
        isStarted.compareAndSet(false, true);
    }

?

任務分發代碼如下:

/**
     * task dispatch
     *
     * @param context context
     * @return result
     * @throws ExecuteException if error throws ExecuteException
     */
    public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
        /**
         * get executor manager
         */
        ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
        if (executorManager == null) {
            throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
        }

        /**
         * host select
         */

        Host host = hostManager.select(context);
        if (StringUtils.isEmpty(host.getAddress())) {
            throw new ExecuteException(String.format("fail to execute : %s due to no suitable worker, "
                            + "current task needs worker group %s to execute",
                    context.getCommand(),context.getWorkerGroup()));
        }
        context.setHost(host);
        executorManager.beforeExecute(context);
        try {
            /**
             * task execute
             */
            return executorManager.execute(context);
        } finally {
            executorManager.afterExecute(context);
        }
    }


/**
     * execute logic
     *
     * @param context context
     * @return result
     * @throws ExecuteException if error throws ExecuteException
     */
    @Override
    public Boolean execute(ExecutionContext context) throws ExecuteException {

        /**
         *  all nodes
         */
        Set<String> allNodes = getAllNodes(context);

        /**
         * fail nodes
         */
        Set<String> failNodeSet = new HashSet<>();

        /**
         *  build command accord executeContext
         */
        Command command = context.getCommand();

        /**
         * execute task host
         */
        Host host = context.getHost();
        boolean success = false;
        while (!success) {
            try {
                doExecute(host, command);
                success = true;
                context.setHost(host);
            } catch (ExecuteException ex) {
                logger.error(String.format("execute command : %s error", command), ex);
                try {
                    failNodeSet.add(host.getAddress());
                    Set<String> tmpAllIps = new HashSet<>(allNodes);
                    Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
                    if (remained != null && remained.size() > 0) {
                        host = Host.of(remained.iterator().next());
                        logger.error("retry execute command : {} host : {}", command, host);
                    } else {
                        throw new ExecuteException("fail after try all nodes");
                    }
                } catch (Throwable t) {
                    throw new ExecuteException("fail after try all nodes");
                }
            }
        }

        return success;
    }


/**
     * execute logic
     *
     * @param host host
     * @param command command
     * @throws ExecuteException if error throws ExecuteException
     */
    public void doExecute(final Host host, final Command command) throws ExecuteException {
        /**
         * retry count,default retry 3
         */
        int retryCount = 3;
        boolean success = false;
        do {
            try {
                nettyRemotingClient.send(host, command);
                success = true;
            } catch (Exception ex) {
                logger.error(String.format("send command : %s to %s error", command, host), ex);
                retryCount--;
                ThreadUtils.sleep(100);
            }
        } while (retryCount >= 0 && !success);

        if (!success) {
            throw new ExecuteException(String.format("send command : %s to %s error", command, host));
        }
    }

  /**
     * send task
     *
     * @param host host
     * @param command command
     */
    public void send(final Host host, final Command command) throws RemotingException {
        Channel channel = getChannel(host);
        if (channel == null) {
            throw new RemotingException(String.format("connect to : %s fail", host));
        }
        try {
            ChannelFuture future = channel.writeAndFlush(command).await();
            if (future.isSuccess()) {
                logger.debug("send command : {} , to : {} successfully.", command, host.getAddress());
            } else {
                String msg = String.format("send command : %s , to :%s failed", command, host.getAddress());
                logger.error(msg, future.cause());
                throw new RemotingException(msg);
            }
        } catch (Exception e) {
            logger.error("Send command {} to address {} encounter error.", command, host.getAddress());
            throw new RemotingException(String.format("Send command : %s , to :%s encounter error", command, host.getAddress()), e);
        }
    }

?Worker部分具體代碼如下:

同理Woker在啟動的時候會初始化NettyServer,注冊對應處理器并啟動:

?

/**
     * worker server run
     */
    @PostConstruct
    public void run() {
        // init remoting server
        NettyServerConfig serverConfig = new NettyServerConfig();
        serverConfig.setListenPort(workerConfig.getListenPort());
        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);

        // logger server
        this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);

        this.nettyRemotingServer.start();

        // install task plugin
        this.taskPluginManager.installPlugin();

        // worker registry
        try {
            this.workerRegistryClient.registry();
            this.workerRegistryClient.setRegistryStoppable(this);
            Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();

            this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new RuntimeException(e);
        }

        // task execute manager
        this.workerManagerThread.start();

        // retry report task status
        this.retryReportTaskStatusThread.start();

        /*
         * registry hooks, which are called before the process exits
         */
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (Stopper.isRunning()) {
                close("shutdownHook");
            }
        }));
    }

回呼執行緒物件初始化的時候,會將包含的Nettyremotingclient一起初始化,并注冊好對應的業務處理器:

 public TaskCallbackService() {
        final NettyClientConfig clientConfig = new NettyClientConfig();
        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningProcessor);
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
    }

?

回呼執行緒會通過其他執行器中快取下來的Chanel與Master的客戶端進行通信:

/**
     * send result
     *
     * @param taskInstanceId taskInstanceId
     * @param command command
     */
    public void send(int taskInstanceId, Command command) {
        NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
        if (nettyRemoteChannel != null) {
            nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener() {

                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        // remove(taskInstanceId);
                        return;
                    }
                }
            });
        }
    }

2.6.2 其他服務與Master互動

以日志服務為例,前端觸發請求日志的介面,通過引數與資料庫互動獲取到Master的NettyServer資訊,然后構建Netty客戶端與Master進行通信獲取日志并回傳,具體代碼如下

 public Result<String> queryLog(@ApiIgnore @RequestAttribute(value = https://www.cnblogs.com/DolphinScheduler/p/Constants.SESSION_USER) User loginUser,
                                   @RequestParam(value ="taskInstanceId") int taskInstanceId,
                                   @RequestParam(value = "https://www.cnblogs.com/DolphinScheduler/p/skipLineNum") int skipNum,
                                   @RequestParam(value = "https://www.cnblogs.com/DolphinScheduler/p/limit") int limit) {
        return loggerService.queryLog(taskInstanceId, skipNum, limit);
    }

 /**
     * view log
     *
     * @param taskInstId task instance id
     * @param skipLineNum skip line number
     * @param limit limit
     * @return log string data
     */
    @Override
    @SuppressWarnings("unchecked")
    public Result<String> queryLog(int taskInstId, int skipLineNum, int limit) {

        TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);

        if (taskInstance == null) {
            return Result.error(Status.TASK_INSTANCE_NOT_FOUND);
        }
        if (StringUtils.isBlank(taskInstance.getHost())) {
            return Result.error(Status.TASK_INSTANCE_HOST_IS_NULL);
        }
        Result<String> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
        String log = queryLog(taskInstance,skipLineNum,limit);
        result.setData(log);
        return result;
    }

/**
     * query log
     *
     * @param taskInstance  task instance
     * @param skipLineNum skip line number
     * @param limit       limit
     * @return log string data
     */
    private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {
        Host host = Host.of(taskInstance.getHost());

        logger.info("log host : {} , logPath : {} , port : {}", host.getIp(), taskInstance.getLogPath(),
                host.getPort());

        StringBuilder log = new StringBuilder();
        if (skipLineNum == 0) {
            String head = String.format(LOG_HEAD_FORMAT,
                    taskInstance.getLogPath(),
                    host,
                    Constants.SYSTEM_LINE_SEPARATOR);
            log.append(head);
        }

        log.append(logClient
                .rollViewLog(host.getIp(), host.getPort(), taskInstance.getLogPath(), skipLineNum, limit));

        return log.toString();
    }

 /**
     * roll view log
     *
     * @param host host
     * @param port port
     * @param path path
     * @param skipLineNum skip line number
     * @param limit limit
     * @return log content
     */
    public String rollViewLog(String host, int port, String path, int skipLineNum, int limit) {
        logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit);
        RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
        String result = "";
        final Host address = new Host(host, port);
        try {
            Command command = request.convert2Command();
            Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
            if (response != null) {
                RollViewLogResponseCommand rollReviewLog = JSONUtils.parseObject(
                        response.getBody(), RollViewLogResponseCommand.class);
                return rollReviewLog.getMsg();
            }
        } catch (Exception e) {
            logger.error("roll view log error", e);
        } finally {
            this.client.closeChannel(address);
        }
        return result;
    }

 /**
     * sync send
     *
     * @param host host
     * @param command command
     * @param timeoutMillis timeoutMillis
     * @return command
     */
    public Command sendSync(final Host host, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {
        final Channel channel = getChannel(host);
        if (channel == null) {
            throw new RemotingException(String.format("connect to : %s fail", host));
        }
        final long opaque = command.getOpaque();
        final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
        channel.writeAndFlush(command).addListener(future -> {
            if (future.isSuccess()) {
                responseFuture.setSendOk(true);
                return;
            } else {
                responseFuture.setSendOk(false);
            }
            responseFuture.setCause(future.cause());
            responseFuture.putResponse(null);
            logger.error("send command {} to host {} failed", command, host);
        });
        /*
         * sync wait for result
         */
        Command result = responseFuture.waitResponse();
        if (result == null) {
            if (responseFuture.isSendOK()) {
                throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());
            } else {
                throw new RemotingException(host.toString(), responseFuture.getCause());
            }
        }
        return result;
    }

Nettyclient隨著日志業務物件初始化而初始化:

 /**
     * construct client
     */
    public LogClientService() {
        this.clientConfig = new NettyClientConfig();
        this.clientConfig.setWorkerThreads(4);
        this.client = new NettyRemotingClient(clientConfig);
        this.isRunning = true;
    }

  

2.7 負載均衡演算法

Master在選擇執行器的時候DolphinScheduler提供了三種負載均衡演算法,且所有的演算法都用到了節點權重:加權隨機(random),平滑輪詢(roundrobin),線性負載(lowerweight),通過組態檔來控制到底使用哪一個負載均衡策略,默認配置是權重策略:host-selector: lower_weight,

@Bean
    public HostManager hostManager() {
        HostSelector selector = masterConfig.getHostSelector();
        HostManager hostManager;
        switch (selector) {
            case RANDOM:
                hostManager = new RandomHostManager();
                break;
            case ROUND_ROBIN:
                hostManager = new RoundRobinHostManager();
                break;
            case LOWER_WEIGHT:
                hostManager = new LowerWeightHostManager();
                break;
            default:
                throw new IllegalArgumentException("unSupport selector " + selector);
        }
        beanFactory.autowireBean(hostManager);
        return hostManager;
    }

2.7.1 加權隨機

看代碼更好理解:按照全部權重值求和,然后取匯總結果的隨機整數,隨機整數對原先所有host的權重累差,回傳小于零的時候的host,沒有就隨機回傳一個,

?

  @Override
    public HostWorker doSelect(final Collection<HostWorker> source) {

        List<HostWorker> hosts = new ArrayList<>(source);
        int size = hosts.size();
        int[] weights = new int[size];
        int totalWeight = 0;
        int index = 0;

        for (HostWorker host : hosts) {
            totalWeight += host.getHostWeight();
            weights[index] = host.getHostWeight();
            index++;
        }

        if (totalWeight > 0) {
            int offset = ThreadLocalRandom.current().nextInt(totalWeight);

            for (int i = 0; i < size; i++) {
                offset -= weights[i];
                if (offset < 0) {
                    return hosts.get(i);
                }
            }
        }
        return hosts.get(ThreadLocalRandom.current().nextInt(size));
    }

2.7.2 線性負載

權重計算邏輯:利用注冊的Cpu占用、記憶體占用以及加載因子還有啟動時間消耗做計算,

?

private double calculateWeight(double cpu, double memory, double loadAverage, long startTime) {
        double calculatedWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR;
        long uptime = System.currentTimeMillis() - startTime;
        if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
            // If the warm-up is not over, add the weight
            return calculatedWeight * Constants.WARM_UP_TIME / uptime;
        }
        return calculatedWeight;
    }

獲取權重最小的節點,并把節點權重置為最大,

?

/**
     * select
     *
     * @param sources sources
     * @return HostWeight
     */
    @Override
    public HostWeight doSelect(Collection<HostWeight> sources) {
        double totalWeight = 0;
        double lowWeight = 0;
        HostWeight lowerNode = null;
        for (HostWeight hostWeight : sources) {
            totalWeight += hostWeight.getWeight();
            hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());
            if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) {
                lowerNode = hostWeight;
                lowWeight = hostWeight.getCurrentWeight();
            }
        }
        lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
        return lowerNode;

    }

2.7.3 平滑輪詢

這個演算法不是很好的能夠理解,所以我不知道我的理解是否正確,它有一個預熱的程序,之前都是取第一個,等到累計的權重超過最大就整數就開始按權重輪詢,

 @Override
    public HostWorker doSelect(Collection<HostWorker> source) {

        List<HostWorker> hosts = new ArrayList<>(source);
        String key = hosts.get(0).getWorkerGroup();
        ConcurrentMap<String, WeightedRoundRobin> map = workGroupWeightMap.get(key);
        if (map == null) {
            workGroupWeightMap.putIfAbsent(key, new ConcurrentHashMap<>());
            map = workGroupWeightMap.get(key);
        }

        int totalWeight = 0;
        long maxCurrent = Long.MIN_VALUE;
        long now = System.currentTimeMillis();
        HostWorker selectedHost = null;
        WeightedRoundRobin selectWeightRoundRobin = null;

        for (HostWorker host : hosts) {
            String workGroupHost = host.getWorkerGroup() + host.getAddress();
            WeightedRoundRobin weightedRoundRobin = map.get(workGroupHost);
            int weight = host.getHostWeight();
            if (weight < 0) {
                weight = 0;
            }

            if (weightedRoundRobin == null) {
                weightedRoundRobin = new WeightedRoundRobin();
                // set weight
                weightedRoundRobin.setWeight(weight);
                map.putIfAbsent(workGroupHost, weightedRoundRobin);
                weightedRoundRobin = map.get(workGroupHost);
            }
            if (weight != weightedRoundRobin.getWeight()) {
                weightedRoundRobin.setWeight(weight);
            }

            long cur = weightedRoundRobin.increaseCurrent();
            weightedRoundRobin.setLastUpdate(now);
            if (cur > maxCurrent) {
                maxCurrent = cur;
                selectedHost = host;
                selectWeightRoundRobin = weightedRoundRobin;
            }

            totalWeight += weight;
        }

        if (!updateLock.get() && hosts.size() != map.size() && updateLock.compareAndSet(false, true)) {
            try {
                ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
                newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
                workGroupWeightMap.put(key, newMap);
            } finally {
                updateLock.set(false);
            }
        }

        if (selectedHost != null) {
            selectWeightRoundRobin.sel(totalWeight);
            return selectedHost;
        }

        return hosts.get(0);
    }

2.8 日志服務

2.6.2已經介紹不在做過多的說明,

2.9 報警

暫未研究,目測基本就是根據規則篩選資料,然后呼叫指定型別的報警服務介面做報警操作,比如郵件,微信,短信通知等,

3 后記

3.1 Make friends

因為沒有正式生產使用,業務理解不一定透徹,理解可能有偏差,歡迎大家一起進入社區交流討論,

Apache DolphinScheduler Slack群鏈接:https://join.slack.com/t/asf-dolphinscheduler/shared_invite/zt-1e36toy4n-5n9U2R__FDM05R~MJFFVBg

3.2 參考文獻

  1. https://dolphinscheduler.apache.org/zh-cn/development/architecture-design.html;

  2. https://juejin.cn/post/6844903729406148622;

  3. https://www.w3cschool.cn/quartz_doc/quartz_doc-1xbu2clr.html.

最后,感謝社區蔡順峰、鐘嘉杰和阮文俊對本文整理和修改提出建設性意見,以及對本文發布提供的幫助,

?

非常歡迎大家加入 DolphinScheduler 大家庭,融入開源世界!

我們鼓勵任何形式的參與社區,最終成為 Committer 或 PPMC,如:

  • 將遇到的問題通過 GitHub 上 issue 的形式反饋出來,

  • 回答別人遇到的 issue 問題,

  • 幫助完善檔案,

  • 幫助專案增加測驗用例,

  • 為代碼添加注釋,

  • 提交修復 Bug 或者 Feature 的 PR,

  • 發表應用案例實踐、調度流程分析或者與調度相關的技術文章,

  • 幫助推廣 DolphinScheduler,參與技術大會或者 meetup 的分享等,

歡迎加入貢獻的隊伍,加入開源從提交第一個 PR 開始,

  • 比如添加代碼注釋或找到帶有 ”easy to fix” 標記或一些非常簡單的 issue(拼寫錯誤等) 等等,先通過第一個簡單的 PR 熟悉提交流程,

注:貢獻不僅僅限于 PR 哈,對促進專案發展的都是貢獻,

相信參與 DolphinScheduler,一定會讓您從開源中受益!

參與貢獻

隨著國內開源的迅猛崛起,Apache DolphinScheduler 社區迎來蓬勃發展,為了做更好用、易用的調度,真誠歡迎熱愛開源的伙伴加入到開源社區中來,為中國開源崛起獻上一份自己的力量,讓本土開源走向全球,

參與 DolphinScheduler 社區有非常多的參與貢獻的方式,包括:

貢獻第一個PR(檔案、代碼) 我們也希望是簡單的,第一個PR用于熟悉提交的流程和社區協作以及感受社區的友好度,

社區匯總了以下適合新手的問題串列:https://github.com/apache/dolphinscheduler/issues/5689

非新手問題串列:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A"volunteer+wanted"

如何參與貢獻鏈接:https://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html

來吧,DolphinScheduler開源社區需要您的參與,為中國開源崛起添磚加瓦吧,哪怕只是小小的一塊瓦,匯聚起來的力量也是巨大的,

參與開源可以近距離與各路高手切磋,迅速提升自己的技能,如果您想參與貢獻,我們有個貢獻者種子范訓群,可以添加社區小助手,手把手教會您( 貢獻者不分水平高低,有問必答,關鍵是有一顆愿意貢獻的心 ),

添加小助手微信時請說明想參與貢獻,來吧,開源社區非常期待您的參與,

?

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/508856.html

標籤:大數據

上一篇:位元組跳動基于ClickHouse優化實踐之“高可用”

下一篇:容器化|自建 MySQL 集群遷移到 Kubernetes

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:33:24 more
  • MySQL中binlog備份腳本分享

    關于MySQL的二進制日志(binlog),我們都知道二進制日志(binlog)非常重要,尤其當你需要point to point災難恢復的時侯,所以我們要對其進行備份。關于二進制日志(binlog)的備份,可以基于flush logs方式先切換binlog,然后拷貝&壓縮到到遠程服務器或本地服務器 ......

    uj5u.com 2023-04-20 08:28:06 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:27:27 more
  • 快取與資料庫雙寫一致性幾種策略分析

    本文將對幾種快取與資料庫保證資料一致性的使用方式進行分析。為保證高并發性能,以下分析場景不考慮執行的原子性及加鎖等強一致性要求的場景,僅追求最終一致性。 ......

    uj5u.com 2023-04-20 08:26:48 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:26:35 more
  • 云時代,MySQL到ClickHouse資料同步產品對比推薦

    ClickHouse 在執行分析查詢時的速度優勢很好的彌補了MySQL的不足,但是對于很多開發者和DBA來說,如何將MySQL穩定、高效、簡單的同步到 ClickHouse 卻很困難。本文對比了 NineData、MaterializeMySQL(ClickHouse自帶)、Bifrost 三款產品... ......

    uj5u.com 2023-04-20 08:26:29 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:25:13 more
  • Redis 報”OutOfDirectMemoryError“(堆外記憶體溢位)

    Redis 報錯“OutOfDirectMemoryError(堆外記憶體溢位) ”問題如下: 一、報錯資訊: 使用 Redis 的業務介面 ,產生 OutOfDirectMemoryError(堆外記憶體溢位),如圖: 格式化后的報錯資訊: { "timestamp": "2023-04-17 22: ......

    uj5u.com 2023-04-20 08:24:54 more
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:24:03 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:23:11 more