以下內容搬運自本人個人網站:
在個人網站閱讀體驗更佳!
我的個人網站
mit6.824 系列學習

首先貼一下課程官網,方便大家查閱.
mit6.824
lab1實作:
首先 在 這里看lab1的要求,我們先執行幾個官網給出的命令搭建實驗環境
$ git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824
$ cd 6.824
$ ls
Makefile src
$
這樣就能直接拉取到所需的代碼
大致目錄如下:

然后繼續看官網:
We supply you with a simple sequential mapreduce implementation in src/main/mrsequential.go. It runs the maps and reduces one at a time, in a single process. We also provide you with a couple of MapReduce applications: word-count in mrapps/wc.go, and a text indexer in mrapps/indexer.go. You can run word count sequentially as follows:
大致是提供一個MapReduce application demo 你可以照著run一下.
$ cd ~/6.824
$ cd src/main
$ go build -buildmode=plugin ../mrapps/wc.go
$ rm mr-out*
$ go run mrsequential.go wc.so pg*.txt
$ more mr-out-0
A 509
ABOUT 2
ACT 8
...
于是我在goland試了一下:

報錯貼了Google:原來是Windows環境做不了這個lab啊
稍加思索—>發現例外–>放棄實驗跑去linux繼續茍
然后就會輸出它給你的文本的單詞統計(這也是我做的第一個hadoop框架有用的demo.hadoop內核代碼也是mapreduce.
clone出來的倉庫中只有一個檔案夾是和第一個lab mapreduce相關的
mr檔案夾!蕪湖
關于mapreduce系統框架:
MapReduce系統是由一個master行程和多個worker行程組成,master和worker之間是通過RPC(Remote Procedure Call)進行通信,master行程負責給多個worker分配任務,記錄任務完成狀態,并且需要處理worker奔潰或者超時運行等問題,worker需要處理相應的任務,處理完畢發送報告給master,再請求下一個任務,
master結構:
type Flag struct {
processing bool
finished bool
}
type Master struct {
FileNames []string
MapFlags []Flag
ReduceFlags []Flag
MapTaskCnts []int
ReduceTaskCnts []int
MapAllDone bool
ReduceALLDone bool
MapNum int
ReduceNum int
Mut sync.Mutex
}
- FileNames : pg*.txt這八個檔案名
- MapFlags :對應的狀態
- ReduceFlags:同狀態
- MapTaskCnts:記錄map當前的任務序列號,如果map任務發生timeout,HandleTimeout這個函式對map任務進行的processing標志清0,重新分配,當前任務序列號在上個任務號中加1,如果之前發生timeout的任務來報告完成,由于小于當前任務號,HandleWorkerReport函式可以無需記錄,直接退出.
- ReduceTaskcnts:同上
- MapAllDone:任務全部完成,變成true
- ReduceAllDone:Reduce任務全部完成為true
- MapNum:Map任務數
- ReduceNum:任務數
- Mut:排它鎖
Worker結構:
type TaskState int
const (
MapState TaskState = 0
ReduceState TaskState = 1
StopState TaskState = 2
WaitState TaskState = 3
)
type WorkerTask struct {
MapID int
ReduceID int
ReduceNum int
MapNum int
MapTaskCnt int
ReduceTaskCnt int
State TaskState
FileName string
MapFunction func(string, string) []KeyValue
ReduceFunction func(string, []string) string
}
- MapID和ReduceID:Map任務ID和Reduce任務ID
- MapNum和ReduceNum:Map的任務總數和Reduce任務總數
- MapTaskCnt和ReduceTaskCnt:Map任務序列號和Reduce序列號
State:任務有四種狀態,分別是MapState,ReduceState,StopState和WaitState,MapState表示當前需要處理Map任務,ReduceState表示當前需要處理Reduce任務,WaitState表示當前沒有需要處理的任務,開始睡眠等待,StopState代表任務已全部完成,可以退出, - FileName:表示Map任務需要的檔案名
- MapFunction和ReduceFunction:任務根據State需要進行的Map函式或者Reduce函式
Master介面:
創建Master:
func MakeMaster(files []string, nReduce int) *Master {
m := Master{FileNames: files,
MapFlags: make([]Flag, len(files), len(files)),
ReduceFlags: make([]Flag, nReduce, nReduce),
MapNum: len(files),
ReduceNum: nReduce,
MapAllDone: false,
ReduceALLDone: false,
MapTaskCnts: make([]int, len(files)),
ReduceTaskCnts: make([]int, nReduce),
}
m.server()
args, reply := NoArgs{}, NoReply{}
go m.HandleTimeOut(&args, &reply)
return &m
}
生成worker task:
func (m *Master) CreateWorkerTask(args *NoArgs, workerTask *WorkerTask) error {
m.Mut.Lock()
defer m.Mut.Unlock()
if !m.MapAllDone {
for idx := 0; idx < m.MapNum; idx++ {
if !m.MapFlags[idx].processing && !m.MapFlags[idx].finished {
workerTask.ReduceNum = m.ReduceNum
workerTask.MapNum = m.MapNum
workerTask.State = MapState
workerTask.MapID = idx
workerTask.FileName = m.FileNames[idx]
m.MapTaskCnts[idx]++
workerTask.MapTaskCnt = m.MapTaskCnts[idx]
m.MapFlags[idx].processing = true
return nil
}
}
workerTask.State = WaitState
return nil
}
if !m.ReduceALLDone {
for idx := 0; idx < m.ReduceNum; idx++ {
if !m.ReduceFlags[idx].processing && !m.ReduceFlags[idx].finished {
workerTask.State = ReduceState
workerTask.ReduceNum = m.ReduceNum
workerTask.MapNum = m.MapNum
workerTask.ReduceID = idx
m.ReduceTaskCnts[idx]++
workerTask.ReduceTaskCnt = m.ReduceTaskCnts[idx]
m.ReduceFlags[idx].processing = true
return nil
}
}
workerTask.State = WaitState
return nil
}
workerTask.State = StopState
return nil
}
函式首先會獲得互斥鎖,然后判斷MapAllDone是否為false,為false進入回圈遍歷,如果某個任務的processing狀態和finished狀態都為false,說明這個任務可以需要被處理,可以分配,講配置引數寫入到輸出引數中,并標志master中當前任務的狀態processing為true以及序列號,如果沒有任務需要處理,說明map有些任務正在處理,有些已完成,進入等待階段,判斷ReduceALLDone與前面類似,不加以敘述,
處理worker report
func (m *Master) HandleWorkerReport(wr *WorkerReportArgs, task *NoReply) error {
m.Mut.Lock()
defer m.Mut.Unlock()
if wr.IsSuccess {
if wr.State == MapState {
if wr.MapTaskCnt == m.MapTaskCnts[wr.MapID] {
m.MapFlags[wr.MapID].finished = true
m.MapFlags[wr.MapID].processing = false
}
} else {
if wr.ReduceTaskCnt == m.ReduceTaskCnts[wr.ReduceID] {
m.ReduceFlags[wr.ReduceID].finished = true
m.ReduceFlags[wr.ReduceID].processing = false
}
}
} else {
if wr.State == MapState {
if m.MapFlags[wr.MapID].finished == false {
m.MapFlags[wr.MapID].processing = false
}
} else {
if m.ReduceFlags[wr.ReduceID].finished == false {
m.ReduceFlags[wr.ReduceID].processing = false
}
}
}
for id := 0; id < m.MapNum; id++ {
if !m.MapFlags[id].finished {
break
} else {
if id == m.MapNum-1 {
m.MapAllDone = true
}
}
}
for id := 0; id < m.ReduceNum; id++ {
if !m.ReduceFlags[id].finished {
break
} else {
if id == m.ReduceNum-1 {
m.ReduceALLDone = true
}
}
}
return nil
}
輸入引數有一個標示位,表示任務是否成功,成功判斷任務狀態以及序列號,如果序列號與master對應上,可以表明這個任務成功,如果對不上,說明這個是個timeout任務,無需處理.如果任務標志位為false,進入錯誤處理,判斷任務是否完成,因為可能是timeout任務標志位為false,未完成讓processing置0,CreateWorkerTask可以重新分配,最后判斷Map任務和Reduce任務是否相應全部完成,全部完成可以設定MapALLDone和ReduceALLDone為true,
處理timeout:
func (m *Master) HandleTimeOut(args *NoArgs, reply *NoReply) error {
for {
m.Mut.Lock()
if m.MapAllDone && m.ReduceALLDone {
m.Mut.Unlock()
break
}
time.Sleep(30 * time.Millisecond)
if !m.MapAllDone {
for idx := 0; idx < m.MapNum; idx++ {
if m.MapFlags[idx].finished == false {
m.MapFlags[idx].processing = false
}
}
} else {
for idx := 0; idx < m.ReduceNum; idx++ {
if m.ReduceFlags[idx].finished == false {
m.ReduceFlags[idx].processing = false
}
}
}
m.Mut.Unlock()
time.Sleep(2000 * time.Millisecond)
}
return nil
}
處理timeout很簡單,先判斷MapAllDone和ReduceAllDone是不是都為true,都為true都退出即可.然后判斷m任務有無完成,沒有完成任務的processing清為0,就可以讓createWorkerTask重新分配沒有完成的任務.最后釋放鎖,睡眠2S,可以看到Handletimeout函式是以2S為間隔,2s沒有完成任務的視為timeout.
Work介面
生成Work
//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
wt := WorkerTask{
MapFunction: mapf,
ReduceFunction: reducef,
}
for {
wt.GetWorkerTask()
if wt.State == MapState {
wt.DoMapWork()
} else if wt.State == ReduceState {
wt.DoReduceWork()
} else if wt.State == StopState {
break
} else if wt.State == WaitState {
time.Sleep(300 * time.Millisecond)
}
}
return
}
mrworker會呼叫worker函式,傳入map函式和reduce函式,根據函式引數創建一個worker,然后進入回圈,呼叫GetWorkerTask函式,這個函式會呼叫Master.CreateWorkerTask函式,并傳入兩個引數,得到任務分配后,講相應的引數和狀態賦值給worker,worker就可以根據狀態進入處理相應任務或者睡眠,或者退出,
Map Work
func (wt *WorkerTask) DoMapWork() {
file, err := os.Open(wt.FileName)
content, err := ioutil.ReadAll(file)
file.Close()
kvs := wt.MapFunction(wt.FileName, string(content))
intermediate := make([][]KeyValue, wt.ReduceNum, wt.ReduceNum)
for _, kv := range kvs {
idx := ihash(kv.Key) % wt.ReduceNum
intermediate[idx] = append(intermediate[idx], kv)
}
for idx := 0; idx < wt.ReduceNum; idx++ {
intermediateFileName := fmt.Sprintf("mr-%d-%d", wt.MapID, idx)
file, err = os.Create(intermediateFileName)
data, _ := json.Marshal(intermediate[idx])
_, err = file.Write(data)
file.Close()
}
wt.ReportWorkerTask(nil)
}
func (wt *WorkerTask) ReportWorkerTask(err error) {
wra := WorkerReportArgs{
MapID: wt.MapID,
ReduceID: wt.ReduceID,
State: wt.State,
IsSuccess: true,
}
if wt.State == MapState {
wra.MapTaskCnt = wt.MapTaskCnt
} else {
wra.ReduceTaskCnt = wt.ReduceTaskCnt
}
wrr := NoReply{}
if err != nil {
wra.IsSuccess = false
}
call("Master.HandleWorkerReport", &wra, &wrr)
}
Map work就是讀取相應的檔案,呼叫MapFunction生成KeyValue對,然后根據哈希函式得到要講當前key分配到哪一塊中,總共有ReduceNum塊,最后根據這么塊生成對應map以及reduce塊的檔案,然后呼叫ReportWorkerTask報告成功,傳入nil表示成功,ReportWorkerTask內部會呼叫Master.HandleWorkerReport函式來匯報這一執行結果,
Reduce Work
func (wt *WorkerTask) DoReduceWork() {
kvsReduce := make(map[string][]string)
for idx := 0; idx < wt.MapNum; idx++ {
filename := fmt.Sprintf("mr-%d-%d", idx, wt.ReduceID)
file, err := os.Open(filename)
content, err := ioutil.ReadAll(file)
file.Close()
kvs := make([]KeyValue, 0)
err = json.Unmarshal(content, &kvs)
for _, kv := range kvs {
_, ok := kvsReduce[kv.Key]
if !ok {
kvsReduce[kv.Key] = make([]string, 0)
}
kvsReduce[kv.Key] = append(kvsReduce[kv.Key], kv.Value)
}
}
ReduceResult := make([]string, 0)
for key, val := range kvsReduce {
ReduceResult = append(ReduceResult, fmt.Sprintf("%v %v\n", key, wt.ReduceFunction(key, val)))
}
outFileName := fmt.Sprintf("mr-out-%d", wt.ReduceID)
err := ioutil.WriteFile(outFileName, []byte(strings.Join(ReduceResult, "")), 0644)
wt.ReportWorkerTask(nil)
}
這里首先讀取相同塊的所有檔案,需要對相同key的內容聚合在一起,然后回圈呼叫ReduceFunction得到Reduce的結果,最后生成輸出.
END:
到這里MapReduce實作的就差不多了,關于MapReduce,總結下來是:map對每個檔案生成單詞和單一數目,分在不同的區塊保存,Reduce對不同區塊進行統計,得到最終結果.講這兩個程序直接包裝起來就是mapreduce.
關于MapReduce的論文,可以閱讀這里.
當然由于是04年的論文,所以現在的翻譯資源已經很豐富了(正經人誰去讀原版那種單詞都認識合成一句話就不知道講什么的東西呢.
最后放一張過test圖片:

? —MIT6.824 lab1 end
? -----------2020.10.11
@copyright ------------baijianruoliorz@Github--------------------------------
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/168944.html
標籤:其他
