各位大拿好,自學spark,發現sparksql做資料分析,很符合自己的口味.所以自己通過一系列的邏輯,做出了如下的代碼
import com.twitter.chill.Base64;
import com.yitian.bankPay.spark.common.DateList;
import com.yitian.bankPay.spark.common.HBaseUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
public class CustomerStatistices {
public static List<Map<String, Object>> niubi() {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local").setAppName("json");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
try {
//讀取txt文本
JavaRDD<String> lineRDD = sc.textFile("F:\\Download\\111.txt");
//將讀取的文本行String轉換為spark的Row型別
JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
@Override
public Row call(String s) throws Exception {
String[] strings = s.split("\t");
Row row = RowFactory.create(strings[0], strings[1], strings[2], strings[3], strings[4]);
return row;
}
});
//快取row
rowRDD = rowRDD.cache();
//rowRDD轉換成sparksql的dataset
List list = new ArrayList();
list.add(DataTypes.createStructField("hms", DataTypes.StringType, true));
list.add(DataTypes.createStructField("id", DataTypes.StringType, true));
list.add(DataTypes.createStructField("name", DataTypes.StringType, true));
list.add(DataTypes.createStructField("pg", DataTypes.StringType, true));
list.add(DataTypes.createStructField("url", DataTypes.StringType, true));
StructType structType = DataTypes.createStructType(list);
Dataset<Row> dataset = sqlContext.createDataFrame(rowRDD, structType);
//dataset快取成臨時表,表明為table
dataset.registerTempTable("table");
//操作臨時表table,查詢并組織結果回傳
Dataset<Row> ds1 = sqlContext.sql("select sum(case when SUBSTRING(hms,1,2)>=0 and SUBSTRING(hms,1,2)<=12 then 1 else 0 end)amnum,\n" +
"sum(case when SUBSTRING(hms,1,2)>=12 and SUBSTRING(hms,1,2)<=23 then 1 else 0 end)pmnum from table");
List<Row> rowList = ds1.toJavaRDD().collect();
List<Map<String, Object>> result = new ArrayList<>();
for (Row r : rowList) {
Map<String, Object> map = new HashMap<>();
map.put("上午", r.getLong(0));
map.put("下午", r.getLong(1));
result.add(map);
}
Dataset<Row> ds2 = sqlContext.sql("select SUBSTRING(hms,1,2)h,count(*) num from table group by SUBSTRING(hms,1,2)");
rowList = ds2.toJavaRDD().collect();
for (Row r : rowList) {
Map<String, Object> map = new HashMap<>();
map.put("time", r.getString(0));
map.put("num", r.getLong(1));
result.add(map);
}
Dataset<Row> ds3 = sqlContext.sql("select sum(case when SUBSTRING(hms,1,2)>=0 and SUBSTRING(hms,1,2)<=12 then 1 else 0 end)amnum,\n" +
"sum(case when SUBSTRING(hms,1,2)>=12 and SUBSTRING(hms,1,2)<=23 then 1 else 0 end)pmnum from table");
rowList = ds3.toJavaRDD().collect();
for (Row r : rowList) {
Map<String, Object> map = new HashMap<>();
map.put("上午", r.getLong(0));
map.put("下午", r.getLong(1));
result.add(map);
}
Dataset<Row> ds4 = sqlContext.sql("select SUBSTRING(hms,1,2)h,count(*) num from table group by SUBSTRING(hms,1,2)");
rowList = ds4.toJavaRDD().collect();
for (Row r : rowList) {
Map<String, Object> map = new HashMap<>();
map.put("time", r.getString(0));
map.put("num", r.getLong(1));
result.add(map);
}
Dataset<Row> ds5 = sqlContext.sql("select sum(case when SUBSTRING(hms,1,2)>=0 and SUBSTRING(hms,1,2)<=12 then 1 else 0 end)amnum,\n" +
"sum(case when SUBSTRING(hms,1,2)>=12 and SUBSTRING(hms,1,2)<=23 then 1 else 0 end)pmnum from table");
rowList = ds5.toJavaRDD().collect();
for (Row r : rowList) {
Map<String, Object> map = new HashMap<>();
map.put("上午", r.getLong(0));
map.put("下午", r.getLong(1));
result.add(map);
}
Dataset<Row> ds6 = sqlContext.sql("select SUBSTRING(hms,1,2)h,count(*) num from table group by SUBSTRING(hms,1,2)");
rowList = ds6.toJavaRDD().collect();
for (Row r : rowList) {
Map<String, Object> map = new HashMap<>();
map.put("time", r.getString(0));
map.put("num", r.getLong(1));
result.add(map);
}
return result;
} catch (Exception e) {
throw e;
} finally {
sc.stop();
}
}
}
我這一個方法運行下來,需要26秒,感覺不正常.不知道是邏輯哪里出了問題
反復的測驗,從單個sql執行,到多個sql執行,包括sql中是否進行了groupby和聚合,都影響效率,而且影響較大,每個sql都生成5個taks
jar如下
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>
<!--===============Hadoop===============-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.10.0</version>
</dependency>
<!--===============HBase===============-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.4.0</version>
</dependency>
求懂得大佬幫看看,感謝
uj5u.com熱心網友回復:
沒用過 spark ,用過 flink,功能差不多。你是在本地運行26s還是在服務器上提交任務運行26s? 本地的話多多少少都有影響轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/263980.html
標籤:Java EE
