public class LinearRegression {
public static void main(String[] args) {
// TODO Auto-generated method stub
SparkConf conf = new SparkConf().setAppName("JavaStreamingLinearRegressionWithSGDE");
JavaStreamingContext jssc=new JavaStreamingContext(conf, Durations.seconds(10L));
JavaDStream<String> data = jssc.textFileStream("/LienarRegression/lpsa.data");
JavaDStream<LabeledPoint> parsedData = data.map(line -> {
String[] parts = line.split(",");
String[] features = parts[1].split(" ");
double[] v = new double[features.length];
for (int i = 0; i < features.length - 1; i++) {
v[i] = Double.parseDouble(features[i]);
}
return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v));
});
parsedData.cache();
JavaDStream<String> data_1 = jssc.textFileStream("/LienarRegression/lpsa_1.data");
// JavaDStream轉JavaPairDStream (JavaPairDStream<K,Vector> data)
int numIterations = 3;
StreamingLinearAlgorithm model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numIterations));
model.trainOn(parsedData);
model.latestModel();
//model.predictOnValues();
}
}資料源示例 : 2.5687881,1.16902610257751 0.855491905752846 2.03274448152093 1.22628985326088 1.89254797819741 2.02833774827712 3.11219574032972 2.68112551007152(用逗號和空格分隔)
呼叫predictOnValues方法 , 引數是JavaPairDStream<K,Vector> data , 如何把JavaDStream轉換成JavaPairDStream ? 求教 , 求上代碼 , 大佬們
uj5u.com熱心網友回復:
textFile("x").map(x => (x, "1"))類似這個操作?
uj5u.com熱心網友回復:
是的 , 但是您這個是scala的 , 我這必須用java寫您看我這個對不 ? parsedData是JavaDStream<LabeledPoint>
JavaPairDStream<Double, Vector> temp = parsedData.mapToPair(new PairFunction<LabeledPoint, Double, Vector>() {
public Tuple2<Double, Vector> call(LabeledPoint p) {
return new Tuple2<Double, Vector>(p.label() , (Vector) p.features());
}
});uj5u.com熱心網友回復:
之前用的textFileStream 現在改用socketTextStream接收埠資料了 , 這是我寫的 , 您給看看有毛病沒 ? 我也是剛開始接觸spark , 您有空給看看 , 測驗資料用的就是github上spark的lpsa.data里面的測驗資料(https://github.com/apache/spark/blob/master/data/mllib/ridge-data/lpsa.data)public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("JavaStreamingLinearRegressionWithSGDETest");
JavaStreamingContext jssc = new JavaStreamingContext(conf , Durations.seconds(2L));
JavaDStream<String> lines= jssc.socketTextStream("***.**.**.***", 9999);
JavaDStream<LabeledPoint> parsedData = lines.map(line -> {
String[] parts = line.split(",");
String[] features = parts[1].split(" ");
double[] v = new double[features.length];
for (int i = 0; i < features.length - 1; i++) {
v[i] = Double.parseDouble(features[i]);
}
return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v));
});
parsedData.cache();
JavaPairDStream<Double, Vector> temp = parsedData.mapToPair(new PairFunction<LabeledPoint, Double, Vector>() {
public Tuple2<Double, Vector> call(LabeledPoint p) {
return new Tuple2<Double, Vector>(p.label() , (Vector) p.features());
}
});
int numIterations = 3;
StreamingLinearAlgorithm model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numIterations));
model.trainOn(parsedData);
model.latestModel();
model.predictOnValues(temp).print();
jssc.start();
try {
jssc.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/33964.html
標籤:Spark
上一篇:Spark常見面試題,拿走不謝
