Flink為了能夠處理有邊界的資料集和無邊界的資料集,提供了對應的DataSet API和DataStream API,我們可以開發對應的Java程式或者Scala程式來完成相應的功能,下面舉例了一些DataSet API中的基本的算子,

下面我們通過具體的代碼來為大家演示每個算子的作用,
1、Map、FlatMap與MapPartition
//獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<String> data = https://www.cnblogs.com/collen7788/archive/2020/10/09/new ArrayList(); data.add("I love Beijing"); data.add("I love China"); data.add("Beijing is the capital of China"); DataSource<String> text = env.fromCollection(data); DataSet<List<String>> mapData = https://www.cnblogs.com/collen7788/archive/2020/10/09/text.map(new MapFunction >() { public List map(String data) throws Exception { String[] words = data.split(" "); //創建一個List List<String> result = new ArrayList<String>(); for(String w:words){ result.add(w); } return result; } }); mapData.print(); System.out.println("*****************************************"); DataSet<String> flatMapData = https://www.cnblogs.com/collen7788/archive/2020/10/09/text.flatMap(new FlatMapFunction () { public void flatMap(String data, Collector collection) throws Exception { String[] words = data.split(" "); for(String w:words){ collection.collect(w); } } }); flatMapData.print(); System.out.println("*****************************************"); /* new MapPartitionFunction<String, String> 第一個String:表示磁區中的資料元素型別 第二個String:表示處理后的資料元素型別*/ DataSet<String> mapPartitionData = https://www.cnblogs.com/collen7788/archive/2020/10/09/text.mapPartition(new MapPartitionFunction () { public void mapPartition(Iterable values, Collector out) throws Exception { //針對磁區進行操作的好處是:比如要進行資料庫的操作,一個磁區只需要創建一個Connection //values中保存了一個磁區的資料 Iterator it = values.iterator(); while (it.hasNext()) { String next = it.next(); String[] split = next.split(" "); for (String word : split) { out.collect(word); } } //關閉鏈接 } }); mapPartitionData.print();
2、Filter與Distinct
//獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<String> data = https://www.cnblogs.com/collen7788/archive/2020/10/09/new ArrayList(); data.add("I love Beijing"); data.add("I love China"); data.add("Beijing is the capital of China"); DataSource<String> text = env.fromCollection(data); DataSet<String> flatMapData = https://www.cnblogs.com/collen7788/archive/2020/10/09/text.flatMap(new FlatMapFunction () { public void flatMap(String data, Collector collection) throws Exception { String[] words = data.split(" "); for(String w:words){ collection.collect(w); } } }); //去掉重復的單詞 flatMapData.distinct().print(); System.out.println("*********************"); //選出長度大于3的單詞 flatMapData.filter(new FilterFunction<String>() { public boolean filter(String word) throws Exception { int length = word.length(); return length>3?true:false; } }).print();
3、Join操作
//獲取運行的環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//創建第一張表:用戶ID 姓名
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
data1.add(new Tuple2(1,"Tom"));
data1.add(new Tuple2(2,"Mike"));
data1.add(new Tuple2(3,"Mary"));
data1.add(new Tuple2(4,"Jone"));
//創建第二張表:用戶ID 所在的城市
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
data2.add(new Tuple2(1,"北京"));
data2.add(new Tuple2(2,"上海"));
data2.add(new Tuple2(3,"廣州"));
data2.add(new Tuple2(4,"重慶"));
//實作join的多表查詢:用戶ID 姓名 所在的程式
DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);
DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);
table1.join(table2).where(0).equalTo(0)
/*第一個Tuple2<Integer,String>:表示第一張表
* 第二個Tuple2<Integer,String>:表示第二張表
* Tuple3<Integer,String, String>:多表join連接查詢后的回傳結果 */
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String, String>>() {
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,
Tuple2<Integer, String> table2) throws Exception {
return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);
} }).print();
4、笛卡爾積
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //創建第一張表:用戶ID 姓名 ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>(); data1.add(new Tuple2(1,"Tom")); data1.add(new Tuple2(2,"Mike")); data1.add(new Tuple2(3,"Mary")); data1.add(new Tuple2(4,"Jone")); //創建第二張表:用戶ID 所在的城市 ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>(); data2.add(new Tuple2(1,"北京")); data2.add(new Tuple2(2,"上海")); data2.add(new Tuple2(3,"廣州")); data2.add(new Tuple2(4,"重慶")); //實作join的多表查詢:用戶ID 姓名 所在的程式 DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1); DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2); //生成笛卡爾積 table1.cross(table2).print();
5、First-N
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//這里的資料是:員工姓名、薪水、部門號
DataSet<Tuple3<String, Integer,Integer>> grade =
env.fromElements(new Tuple3<String, Integer,Integer>("Tom",1000,10),
new Tuple3<String, Integer,Integer>("Mary",1500,20),
new Tuple3<String, Integer,Integer>("Mike",1200,30),
new Tuple3<String, Integer,Integer>("Jerry",2000,10));
//按照插入順序取前三條記錄
grade.first(3).print();
System.out.println("**********************");
//先按照部門號排序,在按照薪水排序
grade.sortPartition(2, Order.ASCENDING).sortPartition(1, Order.ASCENDING).print();
System.out.println("**********************");
//按照部門號分組,求每組的第一條記錄
grade.groupBy(2).first(1).print();
6、外鏈接操作
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//創建第一張表:用戶ID 姓名
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
data1.add(new Tuple2(1,"Tom"));
data1.add(new Tuple2(3,"Mary"));
data1.add(new Tuple2(4,"Jone"));
//創建第二張表:用戶ID 所在的城市
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
data2.add(new Tuple2(1,"北京"));
data2.add(new Tuple2(2,"上海"));
data2.add(new Tuple2(4,"重慶"));
//實作join的多表查詢:用戶ID 姓名 所在的程式
DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);
DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);
//左外連接
table1.leftOuterJoin(table2).where(0).equalTo(0)
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,
Tuple2<Integer, String> table2) throws Exception {
// 左外連接表示等號左邊的資訊會被包含
if(table2 == null){
return new Tuple3<Integer, String, String>(table1.f0,table1.f1,null);
}else{
return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);
}
}
}).print();
System.out.println("***********************************");
//右外連接
table1.rightOuterJoin(table2).where(0).equalTo(0)
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,
Tuple2<Integer, String> table2) throws Exception {
//右外鏈接表示等號右邊的表的資訊會被包含
if(table1 == null){
return new Tuple3<Integer, String, String>(table2.f0,null,table2.f1);
}else{
return new Tuple3<Integer, String, String>(table2.f0,table1.f1,table2.f1);
}
}
}).print();
System.out.println("***********************************");
//全外連接
table1.fullOuterJoin(table2).where(0).equalTo(0)
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1, Tuple2<Integer, String> table2)
throws Exception {
if(table1 == null){
return new Tuple3<Integer, String, String>(table2.f0,null,table2.f1);
}else if(table2 == null){
return new Tuple3<Integer, String, String>(table1.f0,table1.f1,null);
}else{
return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);
}
}
}).print();
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/164501.html
標籤:其他
上一篇:MySQL的簡單實用 手把手教學
