由于處理資料的需求,我寫了一個簡單的spark應用處理資料,本想著處理速度應該有很大的提高,然而結果令我難以接受!
1 spark集群運行條件:3個work節點,Hdfs檔案管理系統,資料輸入2.5G左右,運行時間大約8分鐘。
spark應用程式如下:
package examples;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import scala.Tuple6;
import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
public class Compression {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
if (args.length < 4) {
System.err.println("Usage: Compression <file> <interval> <slice> <type>");
System.exit(1);
}
final Integer interval = Integer.valueOf(args[1]);
SparkConf conf=new SparkConf().setAppName("Compression"+args[3]);
JavaSparkContext ctx=new JavaSparkContext(conf);
JavaRDD<String> lines = ctx.textFile(args[0]);
JavaPairRDD<Integer, Tuple2<Integer, Double>> key_v = lines.mapToPair(
new PairFunction<String, Integer,Tuple2<Integer, Double>>() {
@Override
public Tuple2<Integer, Tuple2<Integer, Double>> call(String s) {
String[] x = SPACE.split(s);
Integer order = Integer.valueOf(x[0]);
Integer k = order/interval;
Tuple2<Integer, Double> v = new Tuple2<Integer, Double>(order%interval, Double.valueOf(x[1]));
return new Tuple2<Integer,Tuple2<Integer, Double>>(k,v);
}
});
JavaPairRDD<Integer, Iterable<Tuple2<Integer, Double>>> segments = key_v.groupByKey();
//JavaPairRDD<Integer, Iterable<Tuple2<Integer, Double>>> segments = key_v.distinct();
JavaPairRDD<Integer, Tuple6<Double, Double, Double, Double, Double, Double>> compressdata = segments.mapValues(
new Function<Iterable<Tuple2<Integer, Double>>, Tuple6<Double, Double, Double, Double, Double, Double>>() {
@Override
public Tuple6<Double, Double, Double, Double, Double, Double> call(Iterable<Tuple2<Integer, Double>> list) throws Exception {
// TODO Auto-generated method stub
Double max = Double.MIN_VALUE;
Double min = Double.MAX_VALUE;
Double total = 0.0;
Double start = 0.0;
Double end = 0.0;
Double avg = 0.0;
Double s = 0.0;
Integer len = 0;
for (Tuple2<Integer, Double> v : list) {
len ++;
if(v._1.equals(0)) start = v._2;
total += v._2;
if(v._2 > max) max = v._2;
if(v._2 < min) min = v._2;
}
avg = total / len;
Double temp = 0.0;
len -= 1;
for (Tuple2<Integer, Double> v : list) {
if(v._1.equals(len))end = v._2;
temp += (v._2 - avg)*(v._2 - avg);
}
s = Math.sqrt( temp / len);
return new Tuple6<Double, Double, Double, Double, Double, Double>(start, end, max, min, avg, s);
}
});
compressdata.saveAsHadoopFile("/SparkTest/Compression/result"+args[3], Text.class, IntWritable.class, TextOutputFormat.class);
System.exit(0);
}
}
2 單機運行,時間大約在2分鐘左右
package deal;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import javax.xml.crypto.Data;
public class Compression {
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
if(args.length < 3)
{
System.out.println("In,out,interval\n");
System.exit(0);
}
File record = new File("records.txt");
BufferedWriter rw = new BufferedWriter(new FileWriter(record,true));
Date dates=new Date();
DateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String timestart=format.format(dates);
File filer = new File(args[0]);
File filew = new File(args[1]);
Integer interval = Integer.valueOf(args[2]);
BufferedReader reader = null;
BufferedWriter writer = null;
try {
reader = new BufferedReader(new FileReader(filer));
writer = new BufferedWriter(new FileWriter(filew));
String tempString = null;
int line = 0;
onedata one = null;
while ((tempString = reader.readLine()) != null) {
String[] lines = tempString.split(" ");
if(line%interval == 0)
{
if(one!=null)
{
one.calculate();
writer.write(one.ToString());
}
one = new onedata(line/interval);
}
one.list.add(Double.valueOf(tempString.split(" ")[1]));
line++;
}
reader.close();
writer.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e1) {
}
}
}
Date datee=new Date();
String timeend=format.format(datee);
long cha = datee.getTime() - dates.getTime();
rw.write(args[0]+"\t"+timestart+"\t"+timeend+"\t"+cha+"\n");
rw.close();
}
}
class onedata{
public ArrayList<Double> list = new ArrayList<Double>();
Double max = Double.MIN_VALUE;
Double min = Double.MAX_VALUE;
Double start = 0.0;
Double end = 0.0;
Double avg = 0.0;
Double s = 0.0;
Integer order = 0;
public onedata(Integer o) {
// TODO Auto-generated constructor stub
order = o;
}
public void calculate() {
Double sum = 0.0;
start = list.get(0);
end = list.get(list.size()-1);
for (Double double1 : list) {
sum += double1;
if(max < double1) max = double1;
if (min > double1) min = double1;
}
avg = sum / list.size();
Double temp = 0.0;
for (Double double1 : list) {
temp += (double1 - avg)*(double1 - avg);
}
s = Math.sqrt( temp / list.size());
}
public String ToString() {
return order+"\t("+start+","+end+","+max+","+min+","+avg+","+s+")\n";
}
}
求各位大牛解釋為啥spark集群運行時消耗的時間比單機的時候多那么多?謝謝!
uj5u.com熱心網友回復:
是不是同樣的代碼?
如果不是同樣的代碼,不好比較.
uj5u.com熱心網友回復:
處理邏輯基本都是一樣的,單機的代碼是正常的Java寫的處理資料的代碼。集群運行的代碼是Spark應用的代碼。
uj5u.com熱心網友回復:
集群怎么配置的處理的資料量有多大
uj5u.com熱心網友回復:
集群處理大資料有優勢,集群啟動分配任務是需要時間的轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/80017.html
標籤:Spark
上一篇:請教《Spark 機器學習》的 python 源代碼檔案如何執行?
下一篇:一位三十歲女程式員的面試苦惱
