2.3 NiFi Processor應用介紹
對于NiFi的使用者來說,如果想要創建一個高效的資料流,那么就需要了解什么樣的單元處理器才最適合這個資料流,NiFi擁有大量的可以用于各種業務場景的單元處理器可供使用者挑選和使用,這些單元處理器主要提供例如系統之間資料的傳輸,資料的路由,資料的轉換、處理、分割和聚合等大類的功能,
在每個NiFi的版本發布中都會有大量的新的處理器單元產生,這就導致本書中講重點介紹1.4.0版本及之前的常用處理器單元的功能,我們講根據這些常用的處理器單元的不同用處進行分類,
2.3.1 資料轉換類處理器單元
CompressContent
CompressContent處理器單元主要用途是對NiFi資料流的FlowFile的內容進行壓縮和解壓縮,支持的壓縮種類如圖
?
ConvertCharacterSet
ConvertCharacterSet處理器單元主要用途將NiFi資料流的FlowFile的內容從一種字符集轉換成另外一種字符集,配置例子如圖
?
EncryptContent
EncryptContent處理器單元主要用途將NiFi資料流的FlowFile的內容進行加密/解密傳輸,
?
ReplaceText
ReplaceText處理器單元主要用途是根據處理器屬性配置的正則運算式對FlowFile的內容進行匹配,如果匹配成功將會降匹配成功的欄位替換為配置屬性中的欄位,將FlowFile的內容全部替換為nifi的配置例子如圖
?
?
2.3.2 資料路由類和調制處理器單元
ControlRate
ControlRate處理器單元用來控制資料流部分流量的速率,
?
上面的圖中的例子表示1分鐘內只允許最多1000個FlowFile流過,
DetectDuplicate
DetectDuplicate處理器單元用來依據用戶定義的特征來監控和發現重復的FlowFile,通常這個處理器會搭配HashContent單元處理器來完成功能,
?
上面的圖中的例子表示Processor根據輸入的FlowFile的hash.value屬性值作為去重條件對FlowFile進行匹配,將去重后的映射到non-duplicate的Relationship中,將重復的FlowFile映射到duplicate的Relationship中,
MonitorActivity
MonitorActivity處理器單元可以在用戶定義的時段內如果沒有資料流量就是發送告警通知,也可以選擇附加功能,在資料流量恢復之后發送恢復通知,
?
上面的圖中例子標示Processor每1分鐘內沒有FlowFile輸入就會不間斷的發出Inactivity Message屬性的內容,且檢測范圍是本Node節點,
RouteOnAttribute
RouteOnAttribute處理器單元可以根據FlowFile的屬性制定路由規則來對FlowFile進行路由,
?
上面的圖中例子表示Processor根據輸入的FlowFile的value屬性進行路由,將含有hello的FlowFile路由到include hello text的Relationship中,將含有world的FlowFile路由到include world text的Relationship中,
ScanAttribute
ScanAttribute處理器單元用途是將FlowFile屬性中被用戶定義的屬性與用戶自定義的字典進行對比,看是否能夠匹配,
?
上面的圖中例子表示Processor輸入的FlowFile中的屬性值只要有一個包含了Sample.txt字典中任意一行的字符,那么Processor就會將這個FlowFile路由到matched的Relationship中,
RouteOnContent
RouteOnContent處理器單元的功能近似于RouteOnAttribute,區別在于RouteOnContent處理器單元進行路由判定的內容是FlowFile的內容而不是之前RouteOnAttribute處理器單元所使用的屬性,
?
上面的圖中例子表示Processor根據輸入FlowFile的內容進行路由,如果輸入的FlowFile的內容為hello,那么它將會被路由到hello relationship的relationship中,
ScanContent
ScanContent處理器單元同樣也近似于ScanAttribute,區別在于前者用戶選取的比對物件是內容而后者定義卻是屬性,
?
上面的圖中例子表示Processor根據輸入FlowFile內容進行掃描路由,如果FlowFile的內容為hello,那么它將會被路由到matched的relationship中,
ValidateXml
ValidateXml處理器單元將FlowFile的XML內容和用戶的XML定義進行校驗,將符合XML定義的FlowFile進行路由,
?
上面的圖中表示Processor根據輸入XML的Schema檔案對輸入的FlowFile內容進行校驗匹配,如果校驗合格的FlowFile會被映射到valid的relationship中,
上面的圖中表示Processor根據輸入的FlowFile中的HiveQL往Hive中寫入或者更新資料,
2.3.3 資料接入類處理器單元
ConvertJSONToSQL
ConvertJSONToSQL處理器單元可以將結構化的Json轉換成INSERT或者UPDATE這樣命令的SQL,配合PutSQL處理器單元可以直接根據這鞋命令將資料插入資料庫中,
?
上面的圖中表示Processor根據輸入的FlowFile的JSON內容,將JSON轉化成Update的SQL陳述句,
ExecuteSQL
ExecuteSQL處理器單元直接運行運行用戶配置的SQL查詢陳述句,并將查詢結果以Avro的格式寫入到FlowFile的內容中去,
?
上面的圖中表示Processor根據用戶配置SQL select query陳述句,從資料庫中查詢出結果,并將結果FlowFile映射到success的relationship中,
PutSQL
PutSQL處理器單元可以根據傳入的FlowFile內容中的DDM SQL對資料庫進行更新操作,
?
上面的圖中表示Processor根據輸入的FlowFile的SQL內容,每100個SQL作為一個事務提交資料庫,并將生成的Key回傳且在事務提交失敗的情況下對事務進行回滾,
SelectHiveQL
SelectHiveQL處理器單元執行Hive的查詢陳述句HiveQL,并且將結果以Avro或者CSV的格式寫入到FlowFile中,
?
上面的圖中表示Processor根據HiveQL陳述句查詢Hive,并將結果以CSV格式輸出,CSV擁有Header為username和age,
PutHiveQL
PutHiveQL處理器根據傳入的HiveQL DDM陳述句對Hive資料倉庫的內容進行更新,
?
上面的圖中表示Processor根據輸入的FlowFile中的HiveQL往Hive中寫入或者更新資料,
2.3.4 屬性提取類處理器單元
EvaluateJsonPath
EvaluateJsonPath處理器單元根據用戶定義的JSONpath運算式對FlowFile的JSON內容進行決議,將這些運算式所決議出來的內容替換FlowFile的內容或者將其更新到FlowFile的屬性中,以便于后續的單元處理器的參考,
?
上面的圖中表示Processor將輸入內容為JSON格式的FlowFile例如{“name”:”zhangsan”,”phone”:”13734564321”},將其中的phone決議出來后輸出到FlowFile的內容中,
EvaluateXPath
EvaluateXPath處理器單元功能近似于EvaluateJsonPath,根據用戶提供的XPath運算式,將FlowFile的XML內容用運算式進行決議,將決議的結果替換如FlowFile的正文或者更新FlowFile的屬性,
?
上面的圖中表示Processor對輸入內容為XML格式的FlowFile利用配置XPath運算式進行決議,并將結果輸出到FlowFile的內容中,
EvaluateXQuery
EvaluateXQuery處理器單元根據用戶定義的XQuery,將FlowFile的XML正文與運算式進行進行虬枝,將提取的結果替換FlowFile的正文或者更新FlowFile的屬性,
?
上面的圖中表示Processor對輸入內容為XML格式的FlowFile利用配置的XQuery運算式進行決議,并將結果以XML格式輸入到FlowFile的內容中,
HashAttribute
HashAttribute處理器單元對用戶選擇的已有屬性串列的值拼接后的字串進行Hash計算,
?
上面的圖中表示Processor對輸入的FlowFile中value屬性值進行hash計算,并將結果輸出到FlowFile的value屬性中,
HashContent
HashContent處理器單元對FlowFile的內容進行Hash,并將Hash值添加到FlowFile的屬性中,
?
上面的圖中表示Processor對輸入的FlowFile中的內容進行hash計算,并將結果輸出到FlowFile的hash.value屬性中,
IdentifyMimeType
IdentifyMimeType處理器單元對FlowFile的內容格式進行判定,此處理器能夠檢測許多不同的MIME型別,例如它能夠判定出FlowFile的內容是圖片,文本和壓縮檔案等格式,
UpdateAttribute
UpdateAttribute處理器單元可以對FlowFile添加任意的用戶定義的屬性,這將有利于對FlowFile添加靜態的屬性,也可以根據NiFi運算式語言來動態的添加屬性,
?
上面的圖中表示Processor對輸入的FlowFile中屬性進行修改,添加一個鍵值為value值為helloworld的屬性,
2.3.5 系統互動類處理器單元
ExecuteProcess
ExecuteProcess處理器單元能夠運行用戶定義的作業系統命令,將處理完的標準輸出內容寫入flowfile中,該處理器是一個不需要輸入的源處理器,它會輸出產生一個新的FlowFile,如果需要提供輸入源請使用下面介紹的executestreamcommand處理器單元,
?
上面的圖中表示Processor根據輸入的指令和引數執行命令ls –l /user,并將結果輸出到FlowFile中,
ExecuteStreamCommand
ExecuteStreamCommand處理器單元運行用戶定義的作業系統命令,輸入的FlowFile的內容作為命令的標準輸入,將處理完的標準輸出內容寫入FlowFile內容中,此處理器單元不同于ExecuteProcess,它必須有FlowFile的輸入才能正常完成功能,
?
上面的圖中表示Processor根據輸入的FlowFile中的path屬性值為/usr/cmd.sh命令腳本的運行引數來運行命令,并將結果輸出到FlowFile中,
2.3.6 資料提取類處理器單元
GetFile
GetFile處理器單元從本地磁盤獲取檔案的內容到NiFi,并洗掉原有的磁盤檔案,這個處理器應用場景是將一個檔案從一個地方搬移到另外一個地方而不是對檔案的拷貝,
?
上面的圖中表示Processor將/user/sample.txt檔案的內容輸出到FlowFile的內容中,
GetFTP
GetFTP處理器單元從FTP服務器檔案內容輸出到FlowFile中,并可以選擇洗掉原有檔案,同樣它的使用場景是檔案的搬移而不是檔案的拷貝,
?
上面的圖中表示Processor將ftpServer01上/resource路徑下的檔案內容輸出到FlowFile中,并將源檔案洗掉,
GetSFTP
GetSFTP處理器單元從SFTP檔案內容輸出到FlowFile中,并可以選擇洗掉原有檔案,同樣它的使用場景是檔案的搬移而不是檔案的拷貝,
?
上面的圖中表示Processor將sftpServer01上/resource路徑下的檔案內容輸出到FlowFile中,并將源檔案洗掉,
GetJMSQueue
GetJMSQueue處理器單元從JMS佇列中下載訊息,并通過JMS Message來創建FlowFile的內容,同時也可以指定創建FlowFile的屬性,
?
GetJMSTopic
GetJMSTopic處理器單元從JMS的Topic中下載訊息,并根據JMS訊息創建FlowFile的內容,通過選擇也能生成FlowFile的屬性,這個處理器單元支持長期和非長期的訂閱模式,
GetHTTP
GetHTTP處理器單元能夠根據URL通過HTTP或者HTTPS協議下載內容到NiFi,從而形成的新的FlowFile內容,同時處理器單元在下載的同時也記憶ETag和最新修改時間來防止資料的重復下載問題,
?
上面的圖中表示Processor根據配置的URL進行http訪問,將訪問結果發送到FlowFile的內容中并且filename屬性值為配置的Filename的值,
ListenHTTP
ListenHTTP處理器單元啟動一個HTTP或者HTTPS監聽埠,當監聽到有POST請求過來的時候,會首先回傳200狀態,并利用POST的請求內容形成新的FlowFile,
?
上面的圖中表示Processor監聽8811埠的HTTP POST請求,當有POST請求訪問http://localhost:8811/contentListener的時候,Processor就會首先回傳200狀態,讓后將POST請求的引數輸出到新的FlowFile的內容中,
ListenUDP
ListenUDP處理器單元監聽UDP資料包,并根據配置獲取一定量的包來創建一個FlowFile并將FlowFile發射到success的Relationships關系中,
?
GetHDFS
GetHDFS處理器單元監控用戶定義的HDFS指定路徑的檔案變化,當有新的檔案寫入HDFS中的該路徑下,那么檔案的內容被用來創建新的FlowFile的內容,同時洗掉原有的檔案,這個處理器同前面一樣適用于檔案的搬移場景而非檔案的復制場景,
?
上面的圖中表示Processor將HDFS上/target路徑下的檔案內容輸出到FlowFile中,并將源檔案洗掉,
2.3.7資料發送類處理器單元
PutEmail
PutEmail處理器單元主要功能是將FlowFile的內容以郵件的形式發送給配置的用戶郵箱,也可以通過配置選擇將FlowFile的內容以附件的方式發送出去,
?
PutFile
PutFile處理器主要功能是將FlowFile的內容以檔案的形式寫入本地磁盤,
?
上面的圖中表示Processor將接收到的FlowFile的內容寫入到本地的磁盤檔案中,(注意:1.5.0之前此Processor不支持追加寫入)
PutFTP
PutFTP處理器單元將FlowFile的內容拷貝到遠程的FTP服務器上,
?
上面的圖中表示Processor將輸入的FlowFile的內容通過FTP協議寫入到ftpServer01的/upload路徑下且上傳路徑不存在的情況下自動創建路徑,
PutSFTP
PutSFTP處理器單元主要功能將FlowFile的內容拷貝到遠程的SFTP服務器上,
?
上面的圖中表示Processor將輸入的FlowFile的內容通過SFTP協議寫入到sftpServer01的/upload路徑下且上傳路徑不存在的情況下自動創建路徑,
PutJMS
PutJMS處理器單元主要功能將FlowFile的內容座位JMS訊息發送到JMS代理上,也可以通過配置根據FlowFile的屬性來添加JMS配置屬性,
?
PutSQL
PutSQL處理器單元的主要功能是將FlowFile的正文當作SQL DDL宣告,FlowFile必須是正確的符合SQL規范的SQL宣告,FlowFile的屬性被用作DDL SQL的引數,這樣可以有效的防止SQL注入攻擊,
?
上面的圖中表示Processor將輸入的FlowFile的內容按照100個進行batch操作寫入資料庫,
PutKafka
PutKafka處理器單元專門是針對0.8.x版本的Kafka,它將FlowFile的內容以訊息的形式發送到Kafka訊息佇列中,FlowFile的內容既可以作為一條完整的訊息發送到Kafka,同時也可以通過分隔符將它切分為多個訊息來發送到Kafka,例如換行符,
?
上面的圖中表示Processor從localhost安裝的Kafka的Sample_topic_A消費資料,并將資料輸出到FlowFile的內容中,
PutMongo
PutMongo處理器單元將FlowFile的內容插入或者更新到MongoDB中,
?
上面的圖中表示Processor根據輸入的FlowFile內容中的doc來寫入MongoDB,
2.3.8切分和聚合類處理器單元
SplitText
SplitText處理器單元可以將一個文本內容的FlowFile切分成你想要數量的FlowFile,
?
上面的圖中表示Processor將輸入的FlowFile的內容切分成多個FlowFile,每個FlowFile的內容都來自于FlowFile中的一行內容,
SplitJson
SplitJson處理器單元可以將一個JSON物件根據它的結構拆解成JSON內部的字物件,
?
上面的圖中表示Processor將輸入的FlowFile內容中的Json按照JsonPath運算式$.*進行第一級切分生成新的FlowFile,
SplitXml
SplitXml處理器單元可以將XML訊息分解為多個FlowFile,且新的FlowFile中包含原有的分段資訊,這種處理器單元經常適用于多個XML元素被封裝在一個元素中,而此處理器單元允許這些元素分離成各自單獨的XML元素,
?
上面的圖中表示Processor對于輸入的FlowFile內容中的XML按照第一層級進行切分,切分出來的子XML輸出到FlowFile中,
UnpackContent
UnpackContent處理器單元可以對壓縮格式的檔案如ZIP和TAR進行解壓,且解壓后的檔案作為一個FlowFile的內容輸出,
?
上圖中UnpackContent和IdentifyMimeType一起使用,后者輸出的FlowFile由前者來進行處理,UnpackContent根據輸入的FlowFile的mime.type屬性對FlowFile的內容進行解壓,
MergeContent
MergeContent處理器單元的主要功能是將多個FlowFile的內容合并成一個FlowFile,這些FlowFile的內容合并的同時,也可以通過配置對合并后的內容增加標題,頁腳和分隔符,也可以對合并后的內容置頂歸檔格式,比如ZIP和TAR,在FlowFile合并的程序中可以依據相同的屬性進行合并,也可以根據之前分片處理器分片后的序號來進行合并,用戶可以定義合并后FlowFile內容的最大值和最小值,當達到這個值的時候FlowFile就合并完畢,為了防止在FlowFile沒有達到配置的大小值的程序中時間太久,用戶也可以通過配置超時引數來有效的解決這個問題,
?
上圖中表示Processor將輸入的FlowFile的內容按照從Queue中任意消費的FlowFile的內容進行Merge輸出到新的FlowFile中,FlowFile的內容格式為TAR,選擇各個輸入FlowFile中一致的屬性寫入到新輸出的FlowFile中,對于不同的Metadata不進行Merge,輸出的新的FlowFile內容中同事也增加了頁頭和頁腳,
SegmentContent
SegmentContent處理器單元可以根據配置切分后的FlowFile大小將一個大的FlowFile切分成許多小的FlowFile,分片是基于位元組的偏移量而不是分隔符,這種將大的FlowFile以分片的形式進行傳輸可以有效的減少大檔案傳輸程序中的延時問題,當這些分片傳輸到達目標端的時候,可以通過其它的處理器單元重新進行組裝,例如上面所說的MergeContent處理器單元,
?
上面的途中表示Processor把輸入的FlowFile的內容按照1MB的大小進行切分,切分成新的FlowFile且新的FlowFile中寫入了分片的序號segment.index和數量segment.count屬性,
SplitContent
SplitContent處理器單元的功能近似于SegmentContent將一個FlowFile分解成多個FlowFile,但區別在于SplitContent在進行分解的程序中不是按照設定的位元組大小,而是根據分隔符進行分裂,
?
上面的圖中表示Processor對輸入的FlowFile的內容按照豎線 | 符號進行切分,切分成多個FlowFile,
2.3.9 HTTP協議類處理器單元
GetHTTP
GetHTTP處理器單元對配置的http或者https協議的URL發起請求并將回傳結果輸出到新的FlowFile中,而且GetHTTP會記錄Etag和最新資料修改時間避免不停的訪問給服務端產生不必要的開銷,如下圖
?
ListenHTTP
ListenHTTP處理器單元監聽Http或者Https請求,如果有請求先回傳200然后將POST的請求引數輸出到新的FlowFile中,
?
上面的圖中表示Processor監聽locahost的http請求,請求URL為http://localhost:9080/contentListener
InvokeHTTP
InvokeHTTP處理器單元能夠根據用戶的配置發送HTTP協議請求,InvokeHTTP處理器單元通過更多的配置可以完成比GetHTTP和PostHTTP更多的功能,如下圖
?
PostHTTP
PostHTTP處理器單元將FlowFile的內容作為HTTP POST請求的body訊息,它通常與ListenHTTP處理器單元組合使用,應用于當多個NiFi實體之間不能通過Site-to-Site的方式進行資料交換的場景,如下圖
?
HandleHttpRequest / HandleHttpResponse
HandleHttpRequest處理器單元可以作為一個源處理器單元來啟動一個HTTP監聽服務功能,類似于ListenHTTP,但是這個處理器不回應客戶端,它將請求的引數以FlowFile的內容和屬性的方式,響資料流的下游進行傳遞,HandleHttpResponse處理器單元能夠回應并將處理后的FlowFile結果回傳請求的客戶端,這兩個處理器通常都是在一起被使用的,
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/22826.html
標籤:大數據
