实验前请安装 Go
环境,编辑器我采用 VS Code
我的实验环境为
Manjaro Linux
,因此就不用虚拟机了并且由于官网的不公开协议,我不会上传代码到
github
上,博客也不会挂在搜索引擎上
实验要求
使用 Go
实现一个简单的 MapReduce
框架,实现的功能为简单的 Word Count
MapReduce
框架简介
这个简单版本的 MapReduce
省去了原文中的一些步骤(例如切分文件等),所以这里解释我们需要实现的 MapReduce
框架是什么样的。
由于我们是使用单机器多进程来模拟多机器的,所以下面我都以进程来描述原框架中的各个机器
其原理十分简单,从宏观上而言,一个MapReduce
框架由一个 master
进程(这里应该叫 coordinator
?但字母太多了,统一用 matser
说吧)和多个 Worker
进程组成。
master
扮演着服务器的角色,其追踪着各个Task
的状态与阶段状态,为Worker
分配Task
并处理Worker
已完成的任务(如果需要处理的话)Worker
扮演着驴的角色,只负责做从master
那要来的任务,做完就一直问master
要新的任务,并把完成的任务上报给master
框架各个部分的设计
官网上给出了很多的要求,直接看可能会眼花缭乱不知道应该如何下手(说的就是我),所以这里对照要求一步一步的给出设计思路
Task
首先我们明确这个 Worker
处理的最小单位,任何一个 Map
/ Reduce
任务都可以视为一个 Task
, 一个完整的 MapReduce
过程被称为一个 Job
(但在这里我们不会考虑 Job
这个概念,因为都是只做一次 MapReduce
过程)
对于一次 Task
,事实上这是 Worker
通过 RPC
从 master
中拿到的关于任务的一切信息,所以我们必须设计一些冗余项来保证信息是完整的。
我们可以分步来进行设计,首先,显然我们设计如下:
type Phase int
const ( Map Phase = iota Reduce Exit Wait)
type Task struct { TaskPhase Phase // task phase for worker InputFile string // input file name}
这样,对于 Worker
而言,我们就知道了
- 当前应该执行的是
Map
任务还是Reduce
任务 - 任务的输入文件
但考虑我们的任务为 word count
, 我们的 Map
任务需要给出中间生成的文件,并且中间生成文件的命名为 mr-x-y
,其中 x
为这是第几个 Map
任务(也就是这是第几个输入文件),y
为相同键值所在的 Reduce
任务的编号,例如 y = ihash(key) % nReduce
意思为键为 Key
的Kv
应当让索引为 y
的 Reduce
来处理
在 work.go
中已给出了 ihash()
的实现,所以我们还需要的是 nReduce
的值,并且我们还需要一个数组来存储中间生成文件的文件名(因为是本地所以直接存文件名就好) ,因此我们二次设计如下:
type Task struct { TaskPhase Phase // task phase for worker nReduce int // for map task InputFile string // input file name Intermediates []string // map task output}
这样我们做完了最初版本的设计,这样的 Task
就可以在 Worker
与 master
中传输了
Master
master
需要有以下功能:
-
在启动时根据指定的输入文件数及
Reduce Task
数,生成Map Task
及Reduce Task
-
响应
Worker
的Task
申请RPC
请求,分配可用的Task
给到Worker
处理 -
追踪
Task
的完成情况,在所有Map Task
完成后进入Reduce
阶段,开始派发Reduce Task
;在所有Reduce Task
完成后标记作业已完成并退出
由于我们需要对 Task
的完成情况进行追踪,并且需要给 Worker
任务,那么这里我们可以通过 map
与 chan
来实现这两个功能
首先,我们设计对 Task
任务的追踪:
type MasterTaskStatus int
const ( Idle MasterTaskStatus = iota InProcess Completed)
// MasterTask// Master's data structure , store every status of Map/Reduce tasktype MasterTask struct { TaskStatus MasterTaskStatus // status of task TaskRef *Task // Reference of task MachineId string // machine id}
这里对 MasterTask
的设计借鉴了原论文中的设计,包括 TaskStatus
与 MachineId
这样,我们就可以完成对 Task
的追踪,接下来就可以对 Master
的结构进行设计了:
type Coordinator struct { // Your definitions here. mutex sync.Mutex // mutex TaskQueue chan *Task // task queue TaskMeta map[int]*MasterTask // task meta info nReduce int // number of Reduce task MasterPhase Phase // phase of all task InputFiles []string // Input files Intermediates [][]string // map task output}
mutex
为互斥锁,用于并发控制nReduce
用以传送给每一个Task
- 消息队列
TaskQueue
用以为Worker
发送Task
,由于chan
自带锁,这样就避免了我们自己手写一个带锁的队列( TaskMeta
为一个map
,其键值为index
,实际上就是文件的序号,当然在这里可以当作Task
的索引,值为MasterTask
,注意这个结构体实际上是包含了Task
的MasterPhase
记录了当前MapReduce
任务已经进行到了什么阶段InputFiles
记录了Map
任务的输入文件Intermediates
为一个文件列表,由于在前面我们提到,Map
任务会生成一系列中间文件mr-X-Y
,其中Y
为Reduce
的编号,所以在这里,我们需要把中间文件存入这里,Intermediates[Y] = mr-i-Y
其中
我们在这里添加了 TaskMeta
之后,显然我们在 Task
中还需要一个值来记录,我们当前的 Task
在 Master
中所对应的元数据应该是什么,所以我们设计 Task
如下所示:
type Task struct { TaskPhase Phase // task phase for worker nReduce int // for map task InputFile string // input file name TaskNumber int // index of task in MasterMeta Intermediates []string // map task output}
(我发誓这是最终版本了)
功能实现
首先最简单的功能,应该是 RPC
中 master
与 worker
之间的任务派发,我们从这一步开始写,但首先,我们需要明确在 master
初始化时需要完成的事情:
func MakeCoordinator(files []string, nReduce int) *Coordinator {
max := func(a int, b int) int { if a > b { return a } return b }
c := Coordinator{ MasterPhase: Map, TaskMeta: make(map[int]*MasterTask), TaskQueue: make(chan *Task, max(len(files), nReduce)), InputFiles: files, Intermediates: make([][]string, nReduce), }
// Your code here.
// init map tasks without machine id (not send yet)
for idx, file := range files { task := Task{ TaskPhase: Map, nReduce: nReduce, InputFile: file, TaskNumber: idx, } c.TaskQueue <- &task c.TaskMeta[idx] = &MasterTask{ TaskStatus: Idle, TaskRef: &task, MachineId: "", } }
c.server()
return &c}
由于 RPC
的存在,因此我们需要设定请求参数与返回参数。
在这里,由于我们跟踪 Task
的完成情况,所以这就要求 Worker
需要向 Master
报告自己是否完成了上一个任务,Master
又会向 Worker
发送新的任务,因此这两步我们可以合成一个部分来看待。
于是我们可以设计请求参数与返回参数如下:
type AskArgs struct { MachineId string // identity of workers LastTask Task // last completed task}
type ReplyArgs Task
当然最后一行可以不写,直接将 reply
设置为 *Task
类型即可,一个简单的框架如下所示:
func (c *Coordinator) AskForTask(args *AskArgs, reply *Task) error { completedTask := args.LastTask // check worker's report to decide whether commit the map file if worker have a completed work { //do something here } // get a new task c.mutex.Lock() defer c.mutex.Unlock()
select { case task, ok := <-c.TaskQueue: if !ok { if c.MasterPhase == Exit { *reply = Task{TaskPhase: Exit} } else { *reply = Task{TaskPhase: Wait} } return nil } *reply = *task
c.TaskMeta[reply.TaskNumber].DeadLine = time.Now().Add(10 * time.Second) c.TaskMeta[reply.TaskNumber].MachineId = args.MachineId c.TaskMeta[reply.TaskNumber].TaskStatus = InProcess default: if c.MasterPhase == Exit { *reply = Task{TaskPhase: Exit} } else { *reply = Task{TaskPhase: Wait} } return nil }
return nil
}
注意这里,我们将 DeadLine
设置为 10s
,这是官网的 Hint
中建议的,至于其用处,在后面其他功能的实现部分会解释
下面解释一下为什么要用 select
,在最开始的版本中,我没有使用 select
,是直接 task, ok := <- c.TaskQueue
,但遇到了代码执行完 Map
阶段后卡住不动,debug
后发现是因为在 TaskQueue
这个 chan
中的内容被取完后,进入了阻塞状态,只有此 chan
中有新任务添加才会被唤起。经过 GPT4
的解答后,采用了 select
进行阻塞的处理,如果阻塞,那么我们将 reply
设置为 WAIT
或 EXIT
来告诉 Worker
应该做什么样的反应
而对于提交任务的部分,我们需要考虑几个条件:
- 由于
Worker
获取到Task
后可能出现宕机和卡死等情况,在这种情况下,Master
会将任务给其他Worker
处理,发生故障的Worker
所做的处理会作废 - 也是由于卡死,所以可能会出现提交的任务阶段滞后的情况,例如已经到了
Reduce
阶段,但此时提交上来的任务却还是Map
的任务,类似这种我们也应该作废
所以,我们对任务判断应该写为:
c.mutex.Lock()if task := c.TaskMeta[completedTask.TaskNumber]; task.MachineId == args.MachineId && task.TaskStatus == InProcess && completedTask.TaskPhase == c.MasterPhase {
c.TaskMeta[completedTask.TaskNumber].TaskStatus = Completed
if c.MasterPhase == Map { for id, file := range completedTask.Intermediates { c.Intermediates[id] = append(c.Intermediates[id], file) } }}c.mutex.Unlock()
如果检测通过,那么我们接受这次任务的提交,并将中间文件存在 Master
上(虽然这里做的处理只是将其路径存在 Master
中)
那么在 Worker
中,我们的结构可以这样:
// Worker// main/mrworker.go calls this function.func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
// Your worker implementation here. // uncomment to send the Example RPC to the coordinator.
machineId := strconv.Itoa(os.Getegid())
lastTask := Task{ TaskNumber: -1, }
flag := true
for flag {
ask := AskArgs{ MachineId: machineId, LastTask: lastTask, }
reply := Task{}
call("Coordinator.AskForTask", &ask, &reply)
currentTask := reply
// log.Printf("%v, %v", currentTask.nReduce, reply.nReduce)
// if in different computer, then need to send json data, master should decode json and store it into local disk switch currentTask.TaskPhase { case Map: mapPhase(¤tTask, mapf) case Reduce: reducePhase(¤tTask, reducef) case Wait: currentTask.TaskNumber = -1 time.Sleep(5 * time.Second) case Exit: flag = false }
lastTask = currentTask
}}
接下来,我们来实现 Worker
中的 map
与 reduce
,首先是 map
阶段 注意定义 KeyValue
结构体
func writeToLocal(x int, y int, content *[]KeyValue) string { dir, err := os.Getwd() if err != nil { log.Fatal("cannot get current working directory") } tempFile, err := os.CreateTemp(dir, "mr-tmp-*") if err != nil { log.Fatal("cannot create a temp file") } enc := json.NewEncoder(tempFile) for _, kv := range *content { if err := enc.Encode(&kv); err != nil { log.Fatalf("cannot encode %v", kv) } } err = tempFile.Close() if err != nil { log.Fatalf("cannot close file %v", tempFile.Name()) }
filename := fmt.Sprintf("mr-%d-%d", x, y) err = os.Rename(tempFile.Name(), filename) if err != nil { log.Fatalf("cannot rename %v to %v", tempFile.Name(), filename) } return filepath.Join(dir, filename)}
func mapPhase(task *Task, mapf func(string, string) []KeyValue) {
file, err := os.Open(task.InputFile) defer func(file *os.File) { err := file.Close() if err != nil { log.Fatalf("cannot close %v", task.InputFile) } }(file) if err != nil { log.Fatalf("cannot open %v", task.InputFile) } content, err := os.ReadFile(task.InputFile) if err != nil { log.Fatalf("cannot read %v", task.InputFile) }
kva := mapf(task.InputFile, string(content))
buffer := make([][]KeyValue, task.nReduce) for _, v := range kva { slot := ihash(v.Key) % task.nReduce buffer[slot] = append(buffer[slot], v) } output := make([]string, 0) for i := 0; i < task.nReduce; i++ { output = append(output, writeToLocal(task.TaskNumber, i, &buffer[i])) } task.Intermediates = output}
大部分都是从 mrsequential.go
中照搬来的代码,所以也没有特别多的解释(
比较值得注意的是,这里我们按照要求,模拟了一个 json
的数据传送
而对于 reduce
任务:
func readFromLocal(files []string) *[]KeyValue {
var kva []KeyValue for _, filename := range files { file, err := os.Open(filename) if err != nil { log.Fatalf("cannot open file %v", filename) } dec := json.NewDecoder(file) for { var kv KeyValue if err := dec.Decode(&kv); err != nil { break } kva = append(kva, kv) } } return &kva}
func reducePhase(task *Task, reducef func(string, []string) string) { intermediate := *readFromLocal(task.Intermediates) sort.Sort(ByKey(intermediate))
dir, _ := os.Getwd() tempFile, err := os.CreateTemp(dir, "mr-tmp-*") if err != nil { log.Fatalf("cannot create temp file") }
i := 0 for i < len(intermediate) { j := i + 1 for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key { j++ } var values []string for k := i; k < j; k++ { values = append(values, intermediate[k].Value) } output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output. _, err = fmt.Fprintf(tempFile, "%v %v\n", intermediate[i].Key, output) if err != nil { log.Fatalf("Error: %v", err) }
i = j }
err = tempFile.Close() if err != nil { log.Fatalf("cannot close temp file %v", tempFile.Name()) } filename := fmt.Sprintf("mr-out-%d", task.TaskNumber) err = os.Rename(tempFile.Name(), filename) if err != nil { log.Fatalf("cannot rename file %v to %v", tempFile.Name(), filename) }}
同样也是照抄 mrsequential.go
的代码(
于是在 worker
这边,我们的任务就已全部做完了。
而我们还有其他要求,即 Worker
长时间不与 master
通信了,为了简化任务,设定该超时阈值为 10s
即可。为了支持这一点,我们的实现需要支持到:
master
追踪已分配Task
的运行情况,在Task
超出10s
仍未完成时,将该Task
重新分配给其他Worker
重试- 考虑
Task
上一次分配的Worker
可能仍在运行,重新分配后会出现两个Worker
同时运行同一个Task
的情况。要确保只有一个Worker
能够完成结果数据的最终写出,以免出现冲突,导致下游观察到重复或缺失的结果数据
第二点会相对复杂些,不过在 Lab 文档中也给出了提示 —— 实际上也是参考了 Google MapReduce 的做法,Worker
在写出数据时可以先写出到临时文件,最终确认没有问题后再将其重命名为正式结果文件,区分开了 Write 和 Commit 的过程。Commit 的过程可以是 Master 来执行,也可以是 Worker 来执行:
- Master Commit:Worker 向 Master 汇报 Task 完成,Master 确认该 Task 是否仍属于该 Worker,是则进行结果文件 Commit,否则直接忽略
- Worker Commit:Worker 向 Master 汇报 Task 完成,Master 确认该 Task 是否仍属于该 Worker 并响应 Worker,是则 Worker 进行结果文件 Commit,再向 Master 汇报 Commit 完成
但实际上这里,我们并不需要区分的如此明确,因为 Worker
会将文件上传到 master
的磁盘中,所以我们可以直接让 worker
进行重命名,也就是我们上面所做的那样。
而对于第一点,我们采用 goroutine
来解决(实际上有点像自动垃圾回收?
go func() { for { time.Sleep(10 * time.Second) c.mutex.Lock() for _, task := range c.TaskMeta { if task.MachineId != "" && time.Now().After(task.DeadLine) && task.TaskRef.TaskPhase == c.MasterPhase && task.TaskStatus == InProcess { task.MachineId = "" c.TaskQueue <- task.TaskRef task.TaskStatus = Idle } } c.mutex.Unlock() }}()
我们需要考虑的点比较多:
- 该任务是否被分配
- 是否超过时限(
DDL
) - 此任务当前的阶段是否与服务器的阶段不匹配(也就是是否为过期任务)
- 任务是否为
InProcess
如果都满足,那么我们将此任务重新加回队列中
我们还差最后一步尚未完成,也就是我们的服务器无法进行阶段转移:从 Map
到 Reduce
, 从 Reduce
到 Exit
并且我们也没有为 Reduce
阶段进行任务的创建
对于第一个,我们可以采用 goroutine
进行周期性的检测:
func (c *Coordinator) AllDone() bool { for _, task := range c.TaskMeta { if task.TaskStatus != Completed { return false } } return true}
go func() { for { c.mutex.Lock() if c.AllDone() { switch c.MasterPhase { case Map: c.MasterPhase = Reduce c.CreateReduceTask() case Reduce: c.MasterPhase = Exit } } c.mutex.Unlock() time.Sleep(time.Second) }}()
对于第二点,我们需要遍历中间生成文件,并创建相应数目的 Reduce
任务:
func (c *Coordinator) CreateReduceTask() { c.TaskMeta = make(map[int]*MasterTask) for idx, file := range c.Intermediates { task := Task{ TaskPhase: Reduce, nReduce: c.nReduce, InputFile: "", TaskNumber: idx, Intermediates: file, } c.TaskQueue <- &task c.TaskMeta[idx] = &MasterTask{ TaskStatus: Idle, DeadLine: time.Time{}, TaskRef: &task, MachineId: "", } }}
注意,上述的两个 goroutine
都放在 MakeCoordinator
执行即可
至此,我们的 MapReduce
框架就已完成,打开终端输入
bash test-mr.sh
进行测试即可