題目要求對已給出的資料使用Spark統計每天的新增的用戶,
資料如下:

我的思路是:先對資料使用sortByKey算子按照日期進行排序,然后將<日期,用戶名>的鍵值對,鍵和值的位置互換,即變成<用戶名,日期>的鍵值對,然后使用reduceByKey只保留每個用戶名出現的第一條資料(因為一開始已經按照日期對資料進行排序),最后將<用戶名,日期>的鍵值對互換回去,然后使用groupByKey按照日期進行分組,最后再用sortByKey按照日期排序再輸出即可,
代碼如下
SparkConf sparkconf = new SparkConf().
setAppName(" ").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(sparkconf);
JavaRDD<String> lines = sc.textFile("檔案路徑");//原始資料檔案路徑
JavaPairRDD<String,String>firstClean = lines.mapToPair(f->{
return new Tuple2<>(f.split(" ")[0],f.split(" ")[1]);
});//創建鍵值對
JavaPairRDD<String,String> rdd = firstClean.sortByKey();//按照日期排序
JavaPairRDD<String,String>rdd2 = rdd.mapToPair(f->{
return new Tuple2<>(f._2,f._1);
});//鍵值對位置互換生成新的鍵值對
JavaPairRDD<String,String> rdd3 = rdd2.reduceByKey((x,y)-> x);//保留用戶名出現的第一條資料
JavaPairRDD<String,String>rdd4 = rdd3.mapToPair(f->{
return new Tuple2<>(f._2,f._1);
});//鍵值對位置互換
JavaPairRDD<String, Iterable<String>> rdd5 = rdd4.groupByKey();//按照日期進行分組
JavaPairRDD<String, Iterable<String>> rdd6 = rdd5.sortByKey();//按照日期排序
rdd6.foreach(x -> System.out.println(x));//輸出
資料變化程序:






同時我也自己生成了資料進行一個資料清洗的程序,最后我沒有使用那個資料,但是大致流程也是一樣的,
資料生成代碼如下:
public class GenerateData{
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
Random r = new Random();
FileWriter fw = new FileWriter("生成的資料存放的位置及其名稱");
int x = r.nextInt(1100000)-100000;//生成的資料范圍
for(int y = 1;y <= x;y++) {
int year = r.nextInt(7)+2015;//年份15-21
int month = r.nextInt(13);//月份0-12
int day = r.nextInt(32);//日期0-31
int n = r.nextInt(10);//用戶名長度0-10
String username = getRandomString(n);
System.out.println(username);
fw.write(y + " " + year + "-" + month + "-" + day + " "+ username + "\r\n" );
fw.flush();//重繪緩沖區
}
}
/*隨機生成字符*/
public static String getRandomString(int length){
String str="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
Random random=new Random();
StringBuffer sb=new StringBuffer();
for(int i=0;i<length;i++){
int number=random.nextInt(62);
sb.append(str.charAt(number));
}
return sb.toString();
}
}
生成資料如下:

因為前面生成資料時不夠嚴謹,生成的資料有些問題,比如用戶名缺失,0月份,0日期,每個月都有31日這種問題,所以只有用Spark來資料清洗了,
代碼如下:
SparkConf sparkconf = new SparkConf().
setAppName(" ").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(sparkconf);
JavaRDD<String> lines = sc.textFile("資料路徑");
JavaRDD<String> firstClean1 = lines.filter(f->{
return Findnull(f);
});//過濾掉用戶名缺失的資料
JavaPairRDD<String,String>firstClean2 = firstClean1.mapToPair(f->{
return new Tuple2<>(f.split(" ")[1],f.split(" ")[2]);
});
long counts = firstClean2.count();//統計還剩多少條資料
JavaPairRDD<Tuple2<Tuple2<Integer,Integer>,Integer>, String> secondClean1 = firstClean2.mapToPair( f -> {
return new Tuple2<Tuple2<Tuple2<Integer,Integer>,Integer>, String>(
new Tuple2<Tuple2<Integer,Integer>,Integer>(
new Tuple2<Integer, Integer>(Integer.valueOf(f._1.split("-")[1]),Integer.valueOf(f._1.split("-")[2])),
Integer.valueOf(f._1.split("-")[0])),f._2);
});//鍵值對瘋狂套娃hhh,變成<<<月,日>,年>,用戶名> 形式的鍵值對,這樣就能對日期比較好操作
JavaPairRDD<Tuple2<Tuple2<Integer,Integer>,Integer>, String> secondClean2 =
secondClean1.filter(f ->{
if(f._1._1._1() != 0) {
if(f._1._1._2() != 0) {
return true;
}
}
return false;
});//過濾調月份和日期為0的資料
JavaPairRDD<Tuple2<Tuple2<Integer,Integer>,Integer>, String> secondClean3 =
secondClean2.filter(f -> {
if (f._1._1._1()<=7) { //月份小于等于7時
if (f._1._1._1()%2==0 && f._1._1._1() !=2 ){
if (f._1._1._2() != 31) { //過濾掉偶數月的31日
return true;
}
}
else if(f._1._1._1() == 2 ){ //月份為2時
if(f._1._2()%4==0&&f._1._2()%100!=0||f._1._2()%400==0){ //根據閏年平年判斷2月是否保留29日
if (f._1._1._2() != 30 && f._1._1._2() != 31 && f._1._1._2() != 0) {
return true;
}
}else{
if (f._1._1._2() != 30 && f._1._1._2() != 31 && f._1._1._2() != 29 && f._1._1._2() != 0) {
return true;
}
}
}else{
return true;
}
}else{ //月份大于7時
if (f._1._1._1()%2==1){
if (f._1._1._2() != 31 ) { //過濾奇數月的31日
return true;
}
}else{
return true;
}
}
return false;
});
long counts2 = secondClean3.count();
過濾第三列為空值的資料方法:
public static Boolean Findnull(String a) throws Exception {
// TODO Auto-generated method stub
String []arr = a.split(" ");
String x = arr[0];
String y = arr[1];
try{
String z = arr[2];
}
catch(Exception e){
return false;
}
return true;
}
閑著無聊又寫了一個資料查詢的代碼,當然要把資料套娃成<<<月,日>,年>,用戶名>才可以用,(這一段代碼本來不是接在剛才的代碼上的,所以引數名有點不一樣,將其修改即可)
Scanner A = new Scanner(System.in);
System.out.println("請輸入需要查詢的日期:");
System.out.println("請輸入年份:");
int year = A.nextInt();
System.out.println("請輸入月份:");
int month = A.nextInt();
System.out.println("請輸入日期:");
int day = A.nextInt();
JavaPairRDD<Tuple2<Tuple2<Integer,Integer>,Integer>, Iterable<String>> secondClean3 =
secondClean2.filter(f -> {
if (month<13&&month>0) {
if(day<32&&day>0) {
if (f._1._2()==year && f._1._1._1()==month && f._1._1._2()==day) {
return true;
}else {
return false;
}
}else{
if (f._1._2()==year && f._1._1._1()==month) {
return true;
}else {
return false;
}
}
}else{
if (f._1._2()==year) {
return true;
}else {
return false;
}
}
});
secondClean3.foreach(x -> System.out.println(x));
System.out.println("是否需要保存?(保存請輸入“1”)");
int x = A.nextInt();
if (x == 1) {
System.out.println("請輸入新建的檔案夾名:");
String add = A.next();
secondClean3.saveAsTextFile("\""+add+"\"");
}
}
}
不足之處歡迎指正,hhh
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/395066.html
標籤:其他
下一篇:hive學習-----基礎陳述句
