scala的wordcount實體
package com.wondersgroup.myscala
import scala.actors.{Actor, Future}
import scala.collection.mutable.ListBuffer
import scala.io.Source
//首先統計每個文本中出現的頻率=》匯總
case class SubmitTask(f:String)
case object StopTask
//統計一個文本中單詞出現的次數
class ActorTest3 extends Actor{
override def act() :Unit = {
while (true) {
receive{
case SubmitTask(f) => {
//把檔案的一行內容作為一個元素存入list
val lines = Source.fromFile(f).getLines().toList
//檔案中的每一個單詞作為一個元素存入list
val words = lines.flatMap(_.split(" "))
print("----------"+words)
println("================"+words.map((_,1)))
//得到一個map ,當前文本的單詞,以及相應單詞出現的次數
println("++++++"+words.map((_,1)).groupBy(_._1))
val result = words.map((_,1)).groupBy(_._1).mapValues(_.size)
println("&&&&&&&&&&&&&&&&"+result)
sender ! result
}
case StopTask => exit()
}
}
}
}
object ActorTest3{
def main(args: Array[String]): Unit = {
//把文本分析任務提交給actor
val replys = new ListBuffer[Future[Any]]
val results = new ListBuffer[Map[String,Int]]
val files = Array("src/wordcount.txt","src/wordcount1.txt")
for(f <- files) {
val actor = new ActorTest3
actor.start()
val reply = actor !! SubmitTask(f)
//把處理結果放到replys
replys += reply
}
//對多個檔案的處理結果匯總
while (replys.size > 0) {
//判斷結果是否可取
val done = replys.filter(_.isSet)
print("@@@@@@@@@@@"+done)
for(res <- done) {
results += res.apply().asInstanceOf[Map[String,Int]]
replys -= res
}
Thread.sleep(5000)
}
//對各個分析結果進行匯總
val res2 = results.flatten.groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2))
println("******************"+res2)
}
}
輸出
@@@@@@@@@@@ListBuffer()----------List(python, is, a, very, brief, language, It, is, also, a, shell, language, we, like, python)================List((python,1), (is,1), (a,1), (very,1), (brief,1), (language,1), (It,1), (is,1), (also,1), (a,1), (shell,1), (language,1), (we,1), (like,1), (python,1)) ----------List(python, java, go, python, c++, c++, java, ruby, c, javascript, c++)================List((python,1), (java,1), (go,1), (python,1), (c++,1), (c++,1), (java,1), (ruby,1), (c,1), (javascript,1), (c++,1)) ++++++Map(java -> List((java,1), (java,1)), c++ -> List((c++,1), (c++,1), (c++,1)), go -> List((go,1)), python -> List((python,1), (python,1)), c -> List((c,1)), ruby -> List((ruby,1)), javascript -> List((javascript,1))) ++++++Map(is -> List((is,1), (is,1)), shell -> List((shell,1)), a -> List((a,1), (a,1)), also -> List((also,1)), language -> List((language,1), (language,1)), brief -> List((brief,1)), python -> List((python,1), (python,1)), It -> List((It,1)), very -> List((very,1)), we -> List((we,1)), like -> List((like,1))) &&&&&&&&&&&&&&&&Map(is -> 2, shell -> 1, a -> 2, also -> 1, language -> 2, brief -> 1, python -> 2, It -> 1, very -> 1, we -> 1, like -> 1) &&&&&&&&&&&&&&&&Map(java -> 2, c++ -> 3, go -> 1, python -> 2, c -> 1, ruby -> 1, javascript -> 1) @@@@@@@@@@@ListBuffer(<function0>, <function0>)******************Map(is -> 2, shell -> 1, a -> 2, java -> 2, c++ -> 3, go -> 1, also -> 1, language -> 2, brief -> 1, python -> 4, It -> 1, c -> 1, ruby -> 1, very -> 1, we -> 1, like -> 1, javascript -> 1)
spark的wordcount
object WordCount {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("wordCount")
.master("local[*]")
.getOrCreate()
//讀取資料
val ds: Dataset[String] = spark.read.textFile("檔案路徑/word.txt")
//引包,不然無法呼叫 flatMap()
import spark.implicits._
//整理資料 (切分壓平)
val ds1: Dataset[String] = ds.flatMap(_.split(" "))
//構建臨時表
ds1.createTempView("word")
//執行 SQL 陳述句,結果倒序
val df: DataFrame = spark.sql("select value,count(*) count from word group by value order by count desc")
//展示
df.show()
//關閉
spark.stop()
}
}
mapreduce的wordcount
mapper
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
//import org.apache.hadoop.io.*;
//import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
/**
* 輸入key LongWritable 行號
* 輸入的value Text 一行內容
* 輸出的key Text 單詞
* 輸出的value IntWritable 單詞的個數
* @author lenovo
*
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text k =new Text();
IntWritable v = new IntWritable(1);
// @SuppressWarnings("unused")
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1 將一行內容轉化為String
String line = value.toString();
// 2 切分
String[] words = line.split(" ");
// 3 回圈寫出到下一個階段 寫
for (String word : words) {
k.set(word);
context.write(k,v);//寫入
}
}
}
reducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text,IntWritable>{
// hello 1
// hello 1
@Override
//相同的進來
protected void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
// 1 匯總 單詞總個數
int sum = 0;
for (IntWritable count : values) {
sum +=count.get();
}
// 2 輸出單詞的總個數
context.write(key, new IntWritable(sum));
}
}
driver
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1獲取job資訊
Configuration configuration = new Configuration();
// 開啟 map 端輸出壓縮
configuration.setBoolean("mapreduce.map.output.compress", true);
// 設定 map 端輸出壓縮方式
// configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
configuration.setClass("mapreduce.map.output.compress.codec", DefaultCodec.class, CompressionCodec.class);
Job job = Job.getInstance(configuration);
// 2 獲取jar包位置
job.setJarByClass(WordCountDriver.class);
// 3 關聯mapper he reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 設定map輸出資料型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 設定最終輸出型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 9 添加combiner 進入reduce之前先進行合并,不是所有的map都能合并,需要滿足要求
// job.setCombinerClass(WordcountCombiner.class);
// 8 設定讀取輸入檔案切片的類 多個小檔案的處理方式 使用CombineTextInputFormat 系統默認TextInputFormat
// job.setInputFormatClass(CombineTextInputFormat.class);
// CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
// CombineTextInputFormat.setMinInputSplitSize(job, 2097152);
// 6 設定資料輸入 輸出檔案的 路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 設定 reduce 端輸出壓縮開啟
FileOutputFormat.setCompressOutput(job, true);
// 設定壓縮的方式
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
// FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
// FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
// 7提交代碼
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
combiner
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// 1 匯總
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
// 2 輸出
context.write(key, new IntWritable(sum));
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/177299.html
標籤:Scala
上一篇:Python生態_turtle庫
