💦今天我們來學習flink中較為基礎的DataStream API,DataStream API用來處理流資料,本文主要是以pyflink的形式來進行講解,對往期內容感興趣的小伙伴👇:
- hadoop專題: hadoop系列文章.
- spark專題: spark系列文章.
- flink專題: Flink系列文章.
💛本博客的API都是python的,根據流資料處理的不同階段,去官方的pyflink檔案中尋找對應的python API 總結而成,如有遺漏的地方,請大家指正,
目錄
- 1. 安裝pyflink
- 2. DataStream API
- 2.1 DataSources資料輸入
- 2.2 DataSteam轉換操作
- 2.3 DataSinks資料輸出
- 3. DataSet
- 4. 參考資料
1. 安裝pyflink
Flink支持python3.6、3.7和3.8,同時Flink1.11以后也支持windows系統了,大家只要直接運行命令即可安裝,
#安裝命令
python3 -m pip install apache-flink -i https://pypi.tuna.tsinghua.edu.cn/simple/
我是在ubuntu中安裝的,記得安裝java8或11哦,出現如下界面即成功了,

2. DataStream API
DataStream API是Flink框架處理無界資料流的重要介面,前面提到,任何一個完整的Flink應用程式應該包含如下三個部分:
- 資料源(DataSource),
- 轉換操作(Transformation),
- 資料匯(DataSink),
2.1 DataSources資料輸入
- 從檔案讀取資料
env.read_text_file(file_path: str, charset_name: str = 'UTF-8')
- 從集合Collection中讀取資料
env.from_collection(collection: List[Any], type_info: pyflink.common.typeinfo.TypeInformation = None)
- 自定義資料源
env.add_source(source_func: pyflink.datastream.functions.SourceFunction, source_name: str = 'Custom Source', type_info: pyflink.common.typeinfo.TypeInformation = None)
- 還支持其他的資料源,上面幾種較為常見,
2.2 DataSteam轉換操作
當Flink應用程式生成資料源后,就需要根據業務需求,通過一系列轉換操作對資料流上的元素進行各種計算,從而輸出最終的結果,
- map
有時候,我們需要對資料流上的每個元素進行處理,比如將單個文本轉換成一個元組,即1對1的轉換操作,此時可以通過map轉換操作完成,
datastreamsource.map(func, output_type)
#Parameters
#func – The MapFunction that is called for each element of the DataStream.
#output_type – The type information of the MapFunction output data.
#Returns
#The transformed DataStream.
- flat_map
在某些情況下,需要對資料流中每個元素生成多個輸出,即1對N的轉換操作,那么此時可以利用flatMap操作,
datastreamsource.flat_map(func, output_type)
#Parameters
#func – The FlatMapFunction that is called for each element of the DataStream.
#output_type – The type information of output data.
#Returns
#The transformed DataStream.
- fliter
有時要從資料流中篩選出符合預期的資料,那就需要對資料流進行過濾處理,即利用filter轉換操作,
datastreamsource.filter(func)
#Parameters
#func – The FilterFunction that is called for each element of the DataStream.
#Returns
#The filtered DataStream.
- key_by
針對不同的資料流元素,有時需要根據某些欄位值,作為磁區的Key來并行處理資料,此時就需要用到keyBy轉換操作,它將一個DataStream型別的資料流轉換成一個KeyedStream資料流型別
datastreamsource.key_by(key_selector,key_type)
#Parameters
#key_selector – The KeySelector to be used for extracting the key for partitioning.
#key_type – The type information describing the key type.
#Returns
#The DataStream with partitioned state(i.e. KeyedStream).
- reduce
對于磁區的資料流,對資料進行reduce處理,它實際上是一種聚合操作,將兩個輸入元素合并成一個輸出元素,它是KeyedStream流上的操作
datastreamsource.reduce(func)
#Parameters
#func – The ReduceFunction that is called for each element of the DataStream.
#Returns
#The transformed DataStream.
例如:
ds = env.from_collection([(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b'])
ds.key_by(lambda x: x[1]).reduce(lambda a, b: a[0] + b[0], b[1])
- union
在流操作場景中,有時需要合并多個流,即將多個資料流合并成一個資料流,此時可以使用union轉換操作(最多合并3個)
#流1合并2,3
datastreamsource1.union(datastreamsource2,datastreamsource3)
#Parameters
#datastreamsource – The DataStream to union outputwith.
#Returns The DataStream.
- connect
除了union可以合并流,還可以使用connect對2個資料流進行合并,且兩個流的資料型別可以不相同,
datastreamsource.connect(ds)
#Parameters
#ds – The DataStream with which this stream will be connected.
#Returns
#The ConnectedStreams.
- project
#dataStreamSource.project(1, 0)方法從資料源dataStreamSource中篩選出2個欄位,其欄位索引分別是1和0,此時列也重新進行排序,
datastreamsource.project(*field_indexes: int)
#Parameters
#field_indexes – The field indexes of the input tuples that are retained. The order of fields in the output tuple corresponds to the order of field indexes.
#Returns
#The projected DataStream.
- partition_custom
partition_custom轉換操作可以根據自身需要,自行制定磁區規則,partitionCustom只能對單個Key進行磁區,不支持復合Key,
datastreamsource.partition_custom(partitioner, key_selector)
#Parameters
#partitioner – The partitioner to assign partitions to keys.
#key_selector – The KeySelector with which the DataStream is partitioned.
#Returns
#The partitioned DataStream.
- window轉換操作
Flink通過window機制,將無界資料流劃分成多個有界的資料流,從而對有界資料流進行資料統計分析,window上還有多種轉換操作,如max求視窗最大值,sum求視窗中元素和等,當視窗中的內置轉換操作不能滿足業務需求時,可以自定義內部的處理邏輯,即用apply方法傳入一個自定義的WindowFunction
#CountWindow將datastream分成幾個視窗
datastreamsource.CountWindow(id: int)
2.3 DataSinks資料輸出
當資料流經過一系列的轉換后,需要將計算結果進行輸出,那么負責輸出結果的算子稱為Sink,
- sink_to
datastreamsource.sink_to(sink: pyflink.datastream.connectors.Sink)
#Adds the given sink to this DataStream. Only streams with sinks added will be executed once the execute() method is called.
#Parameters
#sink – The user defined sink.
#Returns
#The closed DataStream.
- add_sink
datastreamsource.add_sink(sink_func: pyflink.datastream.functions.SinkFunction)
#Adds the given sink to this DataStream. Only streams with sinks added will be executed once the StreamExecutionEnvironment.execute() method is called.
#Parameters
#sink_func – The SinkFunction object.
#Returns
#The closed DataStream.
3. DataSet
上面的部分,我們主要講述了流處理DataStream的DataSource資料源、DataStream轉換操作以及DataSink資料匯,在Flink中將批資料稱為DataSet,關于批資料的處理總結如下:
- 資料源:和DataStream相似
- 轉換操作:參考spark的批處理api
- 資料匯:和DataStream相似
DataSet在這里就不做過多講述,
4. 參考資料
《PyDocs》(pyflink官方檔案)
《Flink入門與實戰》
《Kafka權威指南》
《Apache Flink 必知必會》
《Apache Flink 零基礎入門》
《Flink 基礎教程》
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/426530.html
標籤:其他
