# MIT 6.824 Lab1:MapReduce
实验前请安装 Go
环境,编辑器我采用 VS Code
我的实验环境为
Manjaro Linux
,因此就不用虚拟机了并且由于官网的不公开协议,我不会上传代码到
github
上,博客也不会挂在搜索引擎上
# 实验要求
使用 Go
实现一个简单的 MapReduce
框架,实现的功能为简单的 Word Count
#
MapReduce
框架简介
这个简单版本的 MapReduce
省去了原文中的一些步骤(例如切分文件等),所以这里解释我们需要实现的 MapReduce
框架是什么样的。
由于我们是使用单机器多进程来模拟多机器的,所以下面我都以进程来描述原框架中的各个机器
其原理十分简单,从宏观上而言,一个MapReduce
框架由一个 master
进程(这里应该叫 coordinator
?但字母太多了,统一用 matser
说吧)和多个 Worker
进程组成。
master
扮演着服务器的角色,其追踪着各个Task
的状态与阶段状态,为Worker
分配Task
并处理Worker
已完成的任务(如果需要处理的话)Worker
扮演着驴的角色,只负责做从master
那要来的任务,做完就一直问master
要新的任务,并把完成的任务上报给master
# 框架各个部分的设计
官网上给出了很多的要求,直接看可能会眼花缭乱不知道应该如何下手(说的就是我),所以这里对照要求一步一步的给出设计思路
#
Task
首先我们明确这个 Worker
处理的最小单位,任何一个 Map
/ Reduce
任务都可以视为一个 Task
, 一个完整的 MapReduce
过程被称为一个 Job
(但在这里我们不会考虑 Job
这个概念,因为都是只做一次 MapReduce
过程)
对于一次 Task
,事实上这是 Worker
通过 RPC
从 master
中拿到的关于任务的一切信息,所以我们必须设计一些冗余项来保证信息是完整的。
我们可以分步来进行设计,首先,显然我们设计如下:
type Phase int
const (
Map Phase = iota
Reduce
Exit
Wait
)
type Task struct {
TaskPhase Phase // task phase for worker
InputFile string // input file name
}
这样,对于 Worker
而言,我们就知道了
- 当前应该执行的是
Map
任务还是Reduce
任务 - 任务的输入文件
但考虑我们的任务为 word count
, 我们的 Map
任务需要给出中间生成的文件,并且中间生成文件的命名为 mr-x-y
,其中 x
为这是第几个 Map
任务(也就是这是第几个输入文件),y
为相同键值所在的 Reduce
任务的编号,例如 y = ihash(key) % nReduce
意思为键为 Key
的Kv
应当让索引为 y
的 Reduce
来处理
在 work.go
中已给出了 ihash()
的实现,所以我们还需要的是 nReduce
的值,并且我们还需要一个数组来存储中间生成文件的文件名(因为是本地所以直接存文件名就好) ,因此我们二次设计如下:
type Task struct {
TaskPhase Phase // task phase for worker
nReduce int // for map task
InputFile string // input file name
Intermediates []string // map task output
}
这样我们做完了最初版本的设计,这样的 Task
就可以在 Worker
与 master
中传输了
#
Master
master
需要有以下功能:
在启动时根据指定的输入文件数及
Reduce Task
数,生成Map Task
及Reduce Task
响应
Worker
的Task
申请RPC
请求,分配可用的Task
给到Worker
处理追踪
Task
的完成情况,在所有Map Task
完成后进入Reduce
阶段,开始派发Reduce Task
;在所有Reduce Task
完成后标记作业已完成并退出
由于我们需要对 Task
的完成情况进行追踪,并且需要给 Worker
任务,那么这里我们可以通过 map
与 chan
来实现这两个功能
首先,我们设计对 Task
任务的追踪:
type MasterTaskStatus int
const (
Idle MasterTaskStatus = iota
InProcess
Completed
)
// MasterTask
// Master's data structure , store every status of Map/Reduce task
type MasterTask struct {
TaskStatus MasterTaskStatus // status of task
TaskRef *Task // Reference of task
MachineId string // machine id
}
这里对 MasterTask
的设计借鉴了原论文中的设计,包括 TaskStatus
与 MachineId
这样,我们就可以完成对 Task
的追踪,接下来就可以对 Master
的结构进行设计了:
type Coordinator struct {
// Your definitions here.
mutex sync.Mutex // mutex
TaskQueue chan *Task // task queue
TaskMeta map[int]*MasterTask // task meta info
nReduce int // number of Reduce task
MasterPhase Phase // phase of all task
InputFiles []string // Input files
Intermediates [][]string // map task output
}
mutex
为互斥锁,用于并发控制nReduce
用以传送给每一个Task
- 消息队列
TaskQueue
用以为Worker
发送Task
,由于chan
自带锁,这样就避免了我们自己手写一个带锁的队列( TaskMeta
为一个map
,其键值为index
,实际上就是文件的序号,当然在这里可以当作Task
的索引,值为MasterTask
,注意这个结构体实际上是包含了Task
的MasterPhase
记录了当前MapReduce
任务已经进行到了什么阶段InputFiles
记录了Map
任务的输入文件Intermediates
为一个文件列表,由于在前面我们提到,Map
任务会生成一系列中间文件mr-X-Y
,其中Y
为Reduce
的编号,所以在这里,我们需要把中间文件存入这里,Intermediates[Y] = mr-i-Y
其中 $0 \leq i < mapTask $
我们在这里添加了 TaskMeta
之后,显然我们在 Task
中还需要一个值来记录,我们当前的 Task
在 Master
中所对应的元数据应该是什么,所以我们设计 Task
如下所示:
type Task struct {
TaskPhase Phase // task phase for worker
nReduce int // for map task
InputFile string // input file name
TaskNumber int // index of task in MasterMeta
Intermediates []string // map task output
}
(我发誓这是最终版本了)
# 功能实现
首先最简单的功能,应该是 RPC
中 master
与 worker
之间的任务派发,我们从这一步开始写,但首先,我们需要明确在 master
初始化时需要完成的事情:
func MakeCoordinator(files []string, nReduce int) *Coordinator {
max := func(a int, b int) int {
if a > b {
return a
}
return b
}
c := Coordinator{
MasterPhase: Map,
TaskMeta: make(map[int]*MasterTask),
TaskQueue: make(chan *Task, max(len(files), nReduce)),
InputFiles: files,
Intermediates: make([][]string, nReduce),
}
// Your code here.
// init map tasks without machine id (not send yet)
for idx, file := range files {
task := Task{
TaskPhase: Map,
nReduce: nReduce,
InputFile: file,
TaskNumber: idx,
}
c.TaskQueue <- &task
c.TaskMeta[idx] = &MasterTask{
TaskStatus: Idle,
TaskRef: &task,
MachineId: "",
}
}
c.server()
return &c
}
由于 RPC
的存在,因此我们需要设定请求参数与返回参数。
在这里,由于我们跟踪 Task
的完成情况,所以这就要求 Worker
需要向 Master
报告自己是否完成了上一个任务,Master
又会向 Worker
发送新的任务,因此这两步我们可以合成一个部分来看待。
于是我们可以设计请求参数与返回参数如下:
type AskArgs struct {
MachineId string // identity of workers
LastTask Task // last completed task
}
type ReplyArgs Task
当然最后一行可以不写,直接将 reply
设置为 *Task
类型即可,一个简单的框架如下所示:
func (c *Coordinator) AskForTask(args *AskArgs, reply *Task) error {
completedTask := args.LastTask
// check worker's report to decide whether commit the map file
if worker have a completed work {
//do something here
}
// get a new task
c.mutex.Lock()
defer c.mutex.Unlock()
select {
case task, ok := <-c.TaskQueue:
if !ok {
if c.MasterPhase == Exit {
*reply = Task{TaskPhase: Exit}
} else {
*reply = Task{TaskPhase: Wait}
}
return nil
}
*reply = *task
c.TaskMeta[reply.TaskNumber].DeadLine = time.Now().Add(10 * time.Second)
c.TaskMeta[reply.TaskNumber].MachineId = args.MachineId
c.TaskMeta[reply.TaskNumber].TaskStatus = InProcess
default:
if c.MasterPhase == Exit {
*reply = Task{TaskPhase: Exit}
} else {
*reply = Task{TaskPhase: Wait}
}
return nil
}
return nil
}
注意这里,我们将 DeadLine
设置为 10s
,这是官网的 Hint
中建议的,至于其用处,在后面其他功能的实现部分会解释
下面解释一下为什么要用 select
,在最开始的版本中,我没有使用 select
,是直接 task, ok := <- c.TaskQueue
,但遇到了代码执行完 Map
阶段后卡住不动,debug
后发现是因为在 TaskQueue
这个 chan
中的内容被取完后,进入了阻塞状态,只有此 chan
中有新任务添加才会被唤起。经过 GPT4
的解答后,采用了 select
进行阻塞的处理,如果阻塞,那么我们将 reply
设置为 WAIT
或 EXIT
来告诉 Worker
应该做什么样的反应
而对于提交任务的部分,我们需要考虑几个条件:
- 由于
Worker
获取到Task
后可能出现宕机和卡死等情况,在这种情况下,Master
会将任务给其他Worker
处理,发生故障的Worker
所做的处理会作废 - 也是由于卡死,所以可能会出现提交的任务阶段滞后的情况,例如已经到了
Reduce
阶段,但此时提交上来的任务却还是Map
的任务,类似这种我们也应该作废
所以,我们对任务判断应该写为:
c.mutex.Lock()
if task := c.TaskMeta[completedTask.TaskNumber]; task.MachineId == args.MachineId &&
task.TaskStatus == InProcess && completedTask.TaskPhase == c.MasterPhase {
c.TaskMeta[completedTask.TaskNumber].TaskStatus = Completed
if c.MasterPhase == Map {
for id, file := range completedTask.Intermediates {
c.Intermediates[id] = append(c.Intermediates[id], file)
}
}
}
c.mutex.Unlock()
如果检测通过,那么我们接受这次任务的提交,并将中间文件存在 Master
上(虽然这里做的处理只是将其路径存在 Master
中)
那么在 Worker
中,我们的结构可以这样:
// Worker
// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// Your worker implementation here.
// uncomment to send the Example RPC to the coordinator.
machineId := strconv.Itoa(os.Getegid())
lastTask := Task{
TaskNumber: -1,
}
flag := true
for flag {
ask := AskArgs{
MachineId: machineId,
LastTask: lastTask,
}
reply := Task{}
call("Coordinator.AskForTask", &ask, &reply)
currentTask := reply
// log.Printf("%v, %v", currentTask.nReduce, reply.nReduce)
// if in different computer, then need to send json data, master should decode json and store it into local disk
switch currentTask.TaskPhase {
case Map:
mapPhase(¤tTask, mapf)
case Reduce:
reducePhase(¤tTask, reducef)
case Wait:
currentTask.TaskNumber = -1
time.Sleep(5 * time.Second)
case Exit:
flag = false
}
lastTask = currentTask
}
}
接下来,我们来实现 Worker
中的 map
与 reduce
,首先是 map
阶段 注意定义 KeyValue
结构体
func writeToLocal(x int, y int, content *[]KeyValue) string {
dir, err := os.Getwd()
if err != nil {
log.Fatal("cannot get current working directory")
}
tempFile, err := os.CreateTemp(dir, "mr-tmp-*")
if err != nil {
log.Fatal("cannot create a temp file")
}
enc := json.NewEncoder(tempFile)
for _, kv := range *content {
if err := enc.Encode(&kv); err != nil {
log.Fatalf("cannot encode %v", kv)
}
}
err = tempFile.Close()
if err != nil {
log.Fatalf("cannot close file %v", tempFile.Name())
}
filename := fmt.Sprintf("mr-%d-%d", x, y)
err = os.Rename(tempFile.Name(), filename)
if err != nil {
log.Fatalf("cannot rename %v to %v", tempFile.Name(), filename)
}
return filepath.Join(dir, filename)
}
func mapPhase(task *Task, mapf func(string, string) []KeyValue) {
file, err := os.Open(task.InputFile)
defer func(file *os.File) {
err := file.Close()
if err != nil {
log.Fatalf("cannot close %v", task.InputFile)
}
}(file)
if err != nil {
log.Fatalf("cannot open %v", task.InputFile)
}
content, err := os.ReadFile(task.InputFile)
if err != nil {
log.Fatalf("cannot read %v", task.InputFile)
}
kva := mapf(task.InputFile, string(content))
buffer := make([][]KeyValue, task.nReduce)
for _, v := range kva {
slot := ihash(v.Key) % task.nReduce
buffer[slot] = append(buffer[slot], v)
}
output := make([]string, 0)
for i := 0; i < task.nReduce; i++ {
output = append(output, writeToLocal(task.TaskNumber, i, &buffer[i]))
}
task.Intermediates = output
}
大部分都是从 mrsequential.go
中照搬来的代码,所以也没有特别多的解释(
比较值得注意的是,这里我们按照要求,模拟了一个 json
的数据传送
而对于 reduce
任务:
func readFromLocal(files []string) *[]KeyValue {
var kva []KeyValue
for _, filename := range files {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open file %v", filename)
}
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
}
return &kva
}
func reducePhase(task *Task, reducef func(string, []string) string) {
intermediate := *readFromLocal(task.Intermediates)
sort.Sort(ByKey(intermediate))
dir, _ := os.Getwd()
tempFile, err := os.CreateTemp(dir, "mr-tmp-*")
if err != nil {
log.Fatalf("cannot create temp file")
}
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
var values []string
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output.
_, err = fmt.Fprintf(tempFile, "%v %v\n", intermediate[i].Key, output)
if err != nil {
log.Fatalf("Error: %v", err)
}
i = j
}
err = tempFile.Close()
if err != nil {
log.Fatalf("cannot close temp file %v", tempFile.Name())
}
filename := fmt.Sprintf("mr-out-%d", task.TaskNumber)
err = os.Rename(tempFile.Name(), filename)
if err != nil {
log.Fatalf("cannot rename file %v to %v", tempFile.Name(), filename)
}
}
同样也是照抄 mrsequential.go
的代码(
于是在 worker
这边,我们的任务就已全部做完了。
而我们还有其他要求,即 Worker
长时间不与 master
通信了,为了简化任务,设定该超时阈值为 10s
即可。为了支持这一点,我们的实现需要支持到:
master
追踪已分配Task
的运行情况,在Task
超出10s
仍未完成时,将该Task
重新分配给其他Worker
重试- 考虑
Task
上一次分配的Worker
可能仍在运行,重新分配后会出现两个Worker
同时运行同一个Task
的情况。要确保只有一个Worker
能够完成结果数据的最终写出,以免出现冲突,导致下游观察到重复或缺失的结果数据
第二点会相对复杂些,不过在 Lab 文档中也给出了提示 —— 实际上也是参考了 Google MapReduce 的做法,Worker
在写出数据时可以先写出到临时文件,最终确认没有问题后再将其重命名为正式结果文件,区分开了 Write 和 Commit 的过程。Commit 的过程可以是 Master 来执行,也可以是 Worker 来执行:
- Master Commit:Worker 向 Master 汇报 Task 完成,Master 确认该 Task 是否仍属于该 Worker,是则进行结果文件 Commit,否则直接忽略
- Worker Commit:Worker 向 Master 汇报 Task 完成,Master 确认该 Task 是否仍属于该 Worker 并响应 Worker,是则 Worker 进行结果文件 Commit,再向 Master 汇报 Commit 完成
但实际上这里,我们并不需要区分的如此明确,因为 Worker
会将文件上传到 master
的磁盘中,所以我们可以直接让 worker
进行重命名,也就是我们上面所做的那样。
而对于第一点,我们采用 goroutine
来解决(实际上有点像自动垃圾回收?
go func() {
for {
time.Sleep(10 * time.Second)
c.mutex.Lock()
for _, task := range c.TaskMeta {
if task.MachineId != "" && time.Now().After(task.DeadLine) &&
task.TaskRef.TaskPhase == c.MasterPhase && task.TaskStatus == InProcess {
task.MachineId = ""
c.TaskQueue <- task.TaskRef
task.TaskStatus = Idle
}
}
c.mutex.Unlock()
}
}()
我们需要考虑的点比较多:
- 该任务是否被分配
- 是否超过时限(
DDL
) - 此任务当前的阶段是否与服务器的阶段不匹配(也就是是否为过期任务)
- 任务是否为
InProcess
如果都满足,那么我们将此任务重新加回队列中
我们还差最后一步尚未完成,也就是我们的服务器无法进行阶段转移:从 Map
到 Reduce
, 从 Reduce
到 Exit
并且我们也没有为 Reduce
阶段进行任务的创建
对于第一个,我们可以采用 goroutine
进行周期性的检测:
func (c *Coordinator) AllDone() bool {
for _, task := range c.TaskMeta {
if task.TaskStatus != Completed {
return false
}
}
return true
}
go func() {
for {
c.mutex.Lock()
if c.AllDone() {
switch c.MasterPhase {
case Map:
c.MasterPhase = Reduce
c.CreateReduceTask()
case Reduce:
c.MasterPhase = Exit
}
}
c.mutex.Unlock()
time.Sleep(time.Second)
}
}()
对于第二点,我们需要遍历中间生成文件,并创建相应数目的 Reduce
任务:
func (c *Coordinator) CreateReduceTask() {
c.TaskMeta = make(map[int]*MasterTask)
for idx, file := range c.Intermediates {
task := Task{
TaskPhase: Reduce,
nReduce: c.nReduce,
InputFile: "",
TaskNumber: idx,
Intermediates: file,
}
c.TaskQueue <- &task
c.TaskMeta[idx] = &MasterTask{
TaskStatus: Idle,
DeadLine: time.Time{},
TaskRef: &task,
MachineId: "",
}
}
}
注意,上述的两个 goroutine
都放在 MakeCoordinator
执行即可
至此,我们的 MapReduce
框架就已完成,打开终端输入
bash test-mr.sh
进行测试即可