dolphinscheduler簡單任務定義及跨節點傳參
轉載請注明出處 https://www.cnblogs.com/funnyzpc/p/16395094.html
寫在前面
dolphinscheduler是一款非常不錯的調度工具,本文我就簡稱ds啦,可單機可集群可容器,可調度sql、存盤程序、http、大資料,也可使用shell、python、java、flink等語言及工具,功能強大型別豐富,適合各類調度型任務,社區及專案也十分活躍,現在github中已有8.2k的star??
所以,本篇博文開始會逐步講一些ds相關的東西,也期待各位同行能接觸到此并能實際解決一些生產上的問題~??
一.準備作業
閱讀本博文前建議您先閱讀下官方的檔案[https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/parameter/context.html)(https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/parameter/context.html)(雖然也會碰到一些坑??)
這里,先準備下sql表資源,以下為`postgresql`的`sql`腳本:
(表結構)
CREATE TABLE dolphinscheduler.tmp (
id int4 NOT NULL,
"name" varchar(50) NULL,
"label" varchar(50) NULL,
update_time timestamp NULL,
score int4 NULL,
CONSTRAINT tmp_pkey PRIMARY KEY (id)
);
(表資料)
INSERT INTO tmp (id,"name","label",update_time,score) VALUES
(3,'二狗子','','2022-07-06 21:49:26.872',NULL),
(2,'馬云云','',NULL,NULL),
(1,'李思','','2022-07-05 19:54:31.880',85);
因為個人使用的postgresql的資料庫,如果您是mysql或者其他資料的用戶,請自行更改以上表和資料并添加到庫中即可
表及資料入庫,請將tmp所屬的庫配置到 ds后臺->資料源中心->創建資料源 ,以下是我的配置,記住,這里面的所有資料庫配置均遵守所屬資料庫型別的jdbc的driver的配置引數,配置完成也會在ds的資料庫生成一條jdbc的連接地址,這點要明白~

二.簡單的專案創建及說明
因為`ds`的任務是配置在專案下面,所以第一步得新建一個專案,這樣:`ds后臺`->`專案管理`->`創建專案`,這是我創建的,請看:

準備完專案之后,滑鼠點進去,并進入到 作業流定義選單 頁面,如下圖:

先簡單到解釋下ds的一點兒基本結構,首先,ds一般部署在linux服務器下,創建任務的用戶需要在admin賬戶下創建,重要的是創建的每個作業賬戶需要與作業系統用戶一一對應,比如你創建了一個 test 的ds賬戶,那ds所在的服務器也必須有一個test的賬戶才可行,這是ds的規則,我沒法解釋為什么,
每個用戶下(除了admin外)所能創建的調度任務均在各自創建的專案下,每個專案又分為多個任務(作業流定義),一個任務下又可分為多個任務節點,下圖為任務定義:

ok,如果已經準備好以上步驟,下面開始定義一個簡單的調度任務,繼續哈~
三.簡單的引數傳遞
先看表:

我們先做個簡單的,比如圖中,如果二狗子的本名叫:李思,需要我們取id=1的name放到id=3的label中,并且更新update_time
-
1.這里第一步 在作業流定義串列,點擊
創建作業流就進入一個具體的任務(作業流)的定義,同時我們使用的是sql任務,所以就需要從左側拖動一個sql任務到畫布中(右側空白處):

因為拖動sql任務到畫布會自動彈出節點定義,上圖為當前節點的一個定義,重點是:資料源、sql型別、sql陳述句,如官方所說,如果將name傳遞到下游,則需要在自定義引數重定義這個name為out方向型別為varchar, -
2.因為傳遞到引數需要寫入到表,這里我們再定義一個節點,這個節點負責接收上游傳遞到
name,執行update時使用這個name,以下是我的定義:

看到沒,這里不僅僅要注意sql型別(sql型別與sql陳述句是一一對應的,型別不能錯) ,還有就是前置任務一定要選中(上面定義的)node1節點,
另外,需要注意的是當前任務是上下游傳參,所以在node2中是直接使用node1中定義的name這個引數哈 -
3.定義完成當前任務就需要保存:點右上角保存,填寫并保存后點關閉以退出定義:

-
4.因為定義的任務需要上線了才可執行,所以,在作業流定義串列先點該任務的
黃色按鈕(任務上線),然后才是點綠色按鈕(執行任務):

-
5.任務執行成功與否,具體得看任務實體,這是執行
node2節點的日志:

順帶再看看資料庫表是否真實成功:

完美??
四.復雜的跨節點傳參
首先看表:

思考一個問題:可以看到李思的score是85,根據score應該被評為 B(>=90的為A)并寫入到label欄位,該怎么辦呢,如果這個分數是90分又該怎么辦呢,如果根本沒有score(分值) 這個任務是不是就不需要更新李思的label(評分)呢?
對于上面問題可以有一些偏門的解決方法,比如在sql中塞一個例外值,這樣看似不錯,不過作為調度工具建議還是在condition節點或者switch節點處理是最好的,不過就目前我用的2.0.5版本的ds對于這兩類任務節點是沒法接收引數的,這是一個遺憾;遂~個人覺得較好的方式是在寫入節點之前增加一個判斷節點,將錯誤拋出(沒有score的)最好~,對于此,我使用了一個shell的中間節點,
下面是我定義的三個節點:
-
node1節點定義:

-
node2節點定義:

(腳本內容)
#!/bin/bash
echo "=====>input param start<====="
echo "id=${id}"
echo "score=${score}"
echo "=====>input param end<====="
id=${id}
echo '${setValue(id2='$id')}'
if [ "${score}" -ge "90" ];then
echo '${setValue(label2=level A)}'
echo "level A"
elif [ "${score}" -ge "80" ];then
echo '${setValue(label2=level B)}'
echo "level B"
elif [ "${score}" -ge "60" ];then
echo '${setValue(label2=level C)}'
echo "level C"
elif [ "${score}" -ge "0" ];then
echo '${setValue(label2=F!)}'
echo "F!"
else
echo "NO score ,please check!"
exit 1
fi
-
node3節點定義:

-
看一眼結果??:

五.中間的坑
對于復雜節點傳引數也碰到一些坑,這些坑大概有這些:
- 1.對于
shell腳本不熟悉的,判斷節點其實還是有一些難度的,這是很重要的一點 - 2.
node2(判斷節點)不能有重復的引數,不管區域的還是node1(上一級)傳遞過來的,均不能重復 - 3.因為在
node2(判斷節點)需要將id以及label繼續往下傳(tonode3),這時候就需要給id以及label定義一個映射的out變數(id2、label2) - 4.
node2中重新設定引數麻煩,需要在shell中重新定義變數(id2、label2),同時需要在shell任務內使用拼接的方式賦值(如:echo '${setValue(id2='$id')}') - 5.
sql型別以及不同節點下不同引數時常搞錯,不是任何節點都可以接收上級節點引數,以及區域變數與傳遞變數以及全域變數優先級區別及可能造成沖突 - 6.ds
串列傳參(2.0是不可以的)很雞肋,對于串列傳參又不能在下一級節點做回圈賦值,這點對于ds是有改進的空間的 - 7.等等...
[email protected]對于
ds還有很多可擴展的地方(因為實際需要),所以我就做了一些二次開發??,后面會聊...大家期待喲??
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/498936.html
標籤:其他
上一篇:如何為visio擴展云服務圖示
