实验前请安装 Go 环境,编辑器我采用 VS Code

我的实验环境为 Manjaro Linux,因此就不用虚拟机了

并且由于官网的不公开协议,我不会上传代码到 github 上,博客也不会挂在搜索引擎上

实验要求

使用 Go 实现一个简单的 MapReduce 框架,实现的功能为简单的 Word Count

MapReduce 框架简介

这个简单版本的 MapReduce 省去了原文中的一些步骤(例如切分文件等),所以这里解释我们需要实现的 MapReduce 框架是什么样的。

由于我们是使用单机器多进程来模拟多机器的,所以下面我都以进程来描述原框架中的各个机器

其原理十分简单,从宏观上而言,一个MapReduce 框架由一个 master 进程(这里应该叫 coordinator?但字母太多了,统一用 matser 说吧)和多个 Worker 进程组成。

  • master 扮演着服务器的角色,其追踪着各个 Task 的状态与阶段状态,为 Worker 分配 Task 并处理 Worker 已完成的任务(如果需要处理的话)
  • Worker 扮演着驴的角色,只负责做从 master 那要来的任务,做完就一直问 master 要新的任务,并把完成的任务上报给 master

框架各个部分的设计

官网上给出了很多的要求,直接看可能会眼花缭乱不知道应该如何下手(说的就是我),所以这里对照要求一步一步的给出设计思路

Task

首先我们明确这个 Worker 处理的最小单位,任何一个 Map / Reduce 任务都可以视为一个 Task , 一个完整的 MapReduce 过程被称为一个 Job (但在这里我们不会考虑 Job 这个概念,因为都是只做一次 MapReduce 过程)

对于一次 Task ,事实上这是 Worker 通过 RPCmaster 中拿到的关于任务的一切信息,所以我们必须设计一些冗余项来保证信息是完整的。

我们可以分步来进行设计,首先,显然我们设计如下:

type Phase int
const (
Map Phase = iota
Reduce
Exit
Wait
)
type Task struct {
TaskPhase Phase // task phase for worker
InputFile string // input file name
}

这样,对于 Worker 而言,我们就知道了

  1. 当前应该执行的是 Map 任务还是 Reduce 任务
  2. 任务的输入文件

但考虑我们的任务为 word count, 我们的 Map 任务需要给出中间生成的文件,并且中间生成文件的命名为 mr-x-y,其中 x 为这是第几个 Map 任务(也就是这是第几个输入文件),y 为相同键值所在的 Reduce 任务的编号,例如 y = ihash(key) % nReduce 意思为键为 KeyKv 应当让索引为 yReduce 来处理

work.go 中已给出了 ihash() 的实现,所以我们还需要的是 nReduce 的值,并且我们还需要一个数组来存储中间生成文件的文件名(因为是本地所以直接存文件名就好) ,因此我们二次设计如下:

type Task struct {
TaskPhase Phase // task phase for worker
nReduce int // for map task
InputFile string // input file name
Intermediates []string // map task output
}

这样我们做完了最初版本的设计,这样的 Task 就可以在 Workermaster 中传输了

Master

master 需要有以下功能:

  1. 在启动时根据指定的输入文件数及 Reduce Task 数,生成 Map TaskReduce Task

  2. 响应 WorkerTask 申请 RPC 请求,分配可用的 Task 给到 Worker 处理

  3. 追踪 Task 的完成情况,在所有 Map Task 完成后进入 Reduce 阶段,开始派发 Reduce Task;在所有 Reduce Task 完成后标记作业已完成并退出

由于我们需要对 Task 的完成情况进行追踪,并且需要给 Worker 任务,那么这里我们可以通过 mapchan 来实现这两个功能

首先,我们设计对 Task 任务的追踪:

type MasterTaskStatus int
const (
Idle MasterTaskStatus = iota
InProcess
Completed
)
// MasterTask
// Master's data structure , store every status of Map/Reduce task
type MasterTask struct {
TaskStatus MasterTaskStatus // status of task
TaskRef *Task // Reference of task
MachineId string // machine id
}

这里对 MasterTask 的设计借鉴了原论文中的设计,包括 TaskStatusMachineId

这样,我们就可以完成对 Task 的追踪,接下来就可以对 Master 的结构进行设计了:

type Coordinator struct {
// Your definitions here.
mutex sync.Mutex // mutex
TaskQueue chan *Task // task queue
TaskMeta map[int]*MasterTask // task meta info
nReduce int // number of Reduce task
MasterPhase Phase // phase of all task
InputFiles []string // Input files
Intermediates [][]string // map task output
}
  1. mutex 为互斥锁,用于并发控制
  2. nReduce 用以传送给每一个 Task
  3. 消息队列 TaskQueue 用以为 Worker 发送 Task,由于 chan 自带锁,这样就避免了我们自己手写一个带锁的队列(
  4. TaskMeta 为一个 map,其键值为 index,实际上就是文件的序号,当然在这里可以当作 Task 的索引,值为 MasterTask,注意这个结构体实际上是包含了 Task
  5. MasterPhase 记录了当前 MapReduce 任务已经进行到了什么阶段
  6. InputFiles 记录了 Map 任务的输入文件
  7. Intermediates 为一个文件列表,由于在前面我们提到,Map 任务会生成一系列中间文件 mr-X-Y,其中 YReduce 的编号,所以在这里,我们需要把中间文件存入这里,Intermediates[Y] = mr-i-Y 其中

我们在这里添加了 TaskMeta 之后,显然我们在 Task 中还需要一个值来记录,我们当前的 TaskMaster 中所对应的元数据应该是什么,所以我们设计 Task 如下所示:

type Task struct {
TaskPhase Phase // task phase for worker
nReduce int // for map task
InputFile string // input file name
TaskNumber int // index of task in MasterMeta
Intermediates []string // map task output
}

(我发誓这是最终版本了)

功能实现

首先最简单的功能,应该是 RPCmasterworker 之间的任务派发,我们从这一步开始写,但首先,我们需要明确在 master 初始化时需要完成的事情:

func MakeCoordinator(files []string, nReduce int) *Coordinator {
max := func(a int, b int) int {
if a > b {
return a
}
return b
}
c := Coordinator{
MasterPhase: Map,
TaskMeta: make(map[int]*MasterTask),
TaskQueue: make(chan *Task, max(len(files), nReduce)),
InputFiles: files,
Intermediates: make([][]string, nReduce),
}
// Your code here.
// init map tasks without machine id (not send yet)
for idx, file := range files {
task := Task{
TaskPhase: Map,
nReduce: nReduce,
InputFile: file,
TaskNumber: idx,
}
c.TaskQueue <- &task
c.TaskMeta[idx] = &MasterTask{
TaskStatus: Idle,
TaskRef: &task,
MachineId: "",
}
}
c.server()
return &c
}

由于 RPC 的存在,因此我们需要设定请求参数与返回参数。

在这里,由于我们跟踪 Task 的完成情况,所以这就要求 Worker 需要向 Master 报告自己是否完成了上一个任务,Master 又会向 Worker 发送新的任务,因此这两步我们可以合成一个部分来看待。

于是我们可以设计请求参数与返回参数如下:

type AskArgs struct {
MachineId string // identity of workers
LastTask Task // last completed task
}
type ReplyArgs Task

当然最后一行可以不写,直接将 reply 设置为 *Task 类型即可,一个简单的框架如下所示:

func (c *Coordinator) AskForTask(args *AskArgs, reply *Task) error {
completedTask := args.LastTask
// check worker's report to decide whether commit the map file
if worker have a completed work {
//do something here
}
// get a new task
c.mutex.Lock()
defer c.mutex.Unlock()
select {
case task, ok := <-c.TaskQueue:
if !ok {
if c.MasterPhase == Exit {
*reply = Task{TaskPhase: Exit}
} else {
*reply = Task{TaskPhase: Wait}
}
return nil
}
*reply = *task
c.TaskMeta[reply.TaskNumber].DeadLine = time.Now().Add(10 * time.Second)
c.TaskMeta[reply.TaskNumber].MachineId = args.MachineId
c.TaskMeta[reply.TaskNumber].TaskStatus = InProcess
default:
if c.MasterPhase == Exit {
*reply = Task{TaskPhase: Exit}
} else {
*reply = Task{TaskPhase: Wait}
}
return nil
}
return nil
}

注意这里,我们将 DeadLine 设置为 10s ,这是官网的 Hint 中建议的,至于其用处,在后面其他功能的实现部分会解释

下面解释一下为什么要用 select,在最开始的版本中,我没有使用 select ,是直接 task, ok := <- c.TaskQueue ,但遇到了代码执行完 Map 阶段后卡住不动,debug 后发现是因为在 TaskQueue 这个 chan 中的内容被取完后,进入了阻塞状态,只有此 chan 中有新任务添加才会被唤起。经过 GPT4 的解答后,采用了 select 进行阻塞的处理,如果阻塞,那么我们将 reply 设置为 WAITEXIT 来告诉 Worker 应该做什么样的反应

而对于提交任务的部分,我们需要考虑几个条件:

  1. 由于 Worker 获取到 Task 后可能出现宕机和卡死等情况,在这种情况下, Master 会将任务给其他 Worker 处理,发生故障的 Worker 所做的处理会作废
  2. 也是由于卡死,所以可能会出现提交的任务阶段滞后的情况,例如已经到了 Reduce 阶段,但此时提交上来的任务却还是 Map 的任务,类似这种我们也应该作废

所以,我们对任务判断应该写为:

c.mutex.Lock()
if task := c.TaskMeta[completedTask.TaskNumber]; task.MachineId == args.MachineId &&
task.TaskStatus == InProcess && completedTask.TaskPhase == c.MasterPhase {
c.TaskMeta[completedTask.TaskNumber].TaskStatus = Completed
if c.MasterPhase == Map {
for id, file := range completedTask.Intermediates {
c.Intermediates[id] = append(c.Intermediates[id], file)
}
}
}
c.mutex.Unlock()

如果检测通过,那么我们接受这次任务的提交,并将中间文件存在 Master 上(虽然这里做的处理只是将其路径存在 Master 中)

那么在 Worker 中,我们的结构可以这样:

// Worker
// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// Your worker implementation here.
// uncomment to send the Example RPC to the coordinator.
machineId := strconv.Itoa(os.Getegid())
lastTask := Task{
TaskNumber: -1,
}
flag := true
for flag {
ask := AskArgs{
MachineId: machineId,
LastTask: lastTask,
}
reply := Task{}
call("Coordinator.AskForTask", &ask, &reply)
currentTask := reply
// log.Printf("%v, %v", currentTask.nReduce, reply.nReduce)
// if in different computer, then need to send json data, master should decode json and store it into local disk
switch currentTask.TaskPhase {
case Map:
mapPhase(&currentTask, mapf)
case Reduce:
reducePhase(&currentTask, reducef)
case Wait:
currentTask.TaskNumber = -1
time.Sleep(5 * time.Second)
case Exit:
flag = false
}
lastTask = currentTask
}
}

接下来,我们来实现 Worker 中的 mapreduce,首先是 map 阶段 注意定义 KeyValue 结构体

func writeToLocal(x int, y int, content *[]KeyValue) string {
dir, err := os.Getwd()
if err != nil {
log.Fatal("cannot get current working directory")
}
tempFile, err := os.CreateTemp(dir, "mr-tmp-*")
if err != nil {
log.Fatal("cannot create a temp file")
}
enc := json.NewEncoder(tempFile)
for _, kv := range *content {
if err := enc.Encode(&kv); err != nil {
log.Fatalf("cannot encode %v", kv)
}
}
err = tempFile.Close()
if err != nil {
log.Fatalf("cannot close file %v", tempFile.Name())
}
filename := fmt.Sprintf("mr-%d-%d", x, y)
err = os.Rename(tempFile.Name(), filename)
if err != nil {
log.Fatalf("cannot rename %v to %v", tempFile.Name(), filename)
}
return filepath.Join(dir, filename)
}
func mapPhase(task *Task, mapf func(string, string) []KeyValue) {
file, err := os.Open(task.InputFile)
defer func(file *os.File) {
err := file.Close()
if err != nil {
log.Fatalf("cannot close %v", task.InputFile)
}
}(file)
if err != nil {
log.Fatalf("cannot open %v", task.InputFile)
}
content, err := os.ReadFile(task.InputFile)
if err != nil {
log.Fatalf("cannot read %v", task.InputFile)
}
kva := mapf(task.InputFile, string(content))
buffer := make([][]KeyValue, task.nReduce)
for _, v := range kva {
slot := ihash(v.Key) % task.nReduce
buffer[slot] = append(buffer[slot], v)
}
output := make([]string, 0)
for i := 0; i < task.nReduce; i++ {
output = append(output, writeToLocal(task.TaskNumber, i, &buffer[i]))
}
task.Intermediates = output
}

大部分都是从 mrsequential.go 中照搬来的代码,所以也没有特别多的解释(

比较值得注意的是,这里我们按照要求,模拟了一个 json 的数据传送

而对于 reduce 任务:

func readFromLocal(files []string) *[]KeyValue {
var kva []KeyValue
for _, filename := range files {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open file %v", filename)
}
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
}
return &kva
}
func reducePhase(task *Task, reducef func(string, []string) string) {
intermediate := *readFromLocal(task.Intermediates)
sort.Sort(ByKey(intermediate))
dir, _ := os.Getwd()
tempFile, err := os.CreateTemp(dir, "mr-tmp-*")
if err != nil {
log.Fatalf("cannot create temp file")
}
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
var values []string
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output.
_, err = fmt.Fprintf(tempFile, "%v %v\n", intermediate[i].Key, output)
if err != nil {
log.Fatalf("Error: %v", err)
}
i = j
}
err = tempFile.Close()
if err != nil {
log.Fatalf("cannot close temp file %v", tempFile.Name())
}
filename := fmt.Sprintf("mr-out-%d", task.TaskNumber)
err = os.Rename(tempFile.Name(), filename)
if err != nil {
log.Fatalf("cannot rename file %v to %v", tempFile.Name(), filename)
}
}

同样也是照抄 mrsequential.go 的代码(

于是在 worker 这边,我们的任务就已全部做完了。

而我们还有其他要求,即 Worker 长时间不与 master 通信了,为了简化任务,设定该超时阈值为 10s 即可。为了支持这一点,我们的实现需要支持到:

  1. master 追踪已分配 Task 的运行情况,在 Task 超出 10s 仍未完成时,将该 Task 重新分配给其他 Worker 重试
  2. 考虑 Task 上一次分配的 Worker 可能仍在运行,重新分配后会出现两个 Worker 同时运行同一个 Task 的情况。要确保只有一个 Worker 能够完成结果数据的最终写出,以免出现冲突,导致下游观察到重复或缺失的结果数据

第二点会相对复杂些,不过在 Lab 文档中也给出了提示 —— 实际上也是参考了 Google MapReduce 的做法,Worker 在写出数据时可以先写出到临时文件,最终确认没有问题后再将其重命名为正式结果文件,区分开了 Write 和 Commit 的过程。Commit 的过程可以是 Master 来执行,也可以是 Worker 来执行:

  • Master Commit:Worker 向 Master 汇报 Task 完成,Master 确认该 Task 是否仍属于该 Worker,是则进行结果文件 Commit,否则直接忽略
  • Worker Commit:Worker 向 Master 汇报 Task 完成,Master 确认该 Task 是否仍属于该 Worker 并响应 Worker,是则 Worker 进行结果文件 Commit,再向 Master 汇报 Commit 完成

但实际上这里,我们并不需要区分的如此明确,因为 Worker 会将文件上传到 master 的磁盘中,所以我们可以直接让 worker 进行重命名,也就是我们上面所做的那样。

而对于第一点,我们采用 goroutine 来解决(实际上有点像自动垃圾回收?

go func() {
for {
time.Sleep(10 * time.Second)
c.mutex.Lock()
for _, task := range c.TaskMeta {
if task.MachineId != "" && time.Now().After(task.DeadLine) &&
task.TaskRef.TaskPhase == c.MasterPhase && task.TaskStatus == InProcess {
task.MachineId = ""
c.TaskQueue <- task.TaskRef
task.TaskStatus = Idle
}
}
c.mutex.Unlock()
}
}()

我们需要考虑的点比较多:

  1. 该任务是否被分配
  2. 是否超过时限(DDL
  3. 此任务当前的阶段是否与服务器的阶段不匹配(也就是是否为过期任务)
  4. 任务是否为 InProcess

如果都满足,那么我们将此任务重新加回队列中

我们还差最后一步尚未完成,也就是我们的服务器无法进行阶段转移:从 MapReduce, 从 ReduceExit

并且我们也没有为 Reduce 阶段进行任务的创建

对于第一个,我们可以采用 goroutine 进行周期性的检测:

func (c *Coordinator) AllDone() bool {
for _, task := range c.TaskMeta {
if task.TaskStatus != Completed {
return false
}
}
return true
}
go func() {
for {
c.mutex.Lock()
if c.AllDone() {
switch c.MasterPhase {
case Map:
c.MasterPhase = Reduce
c.CreateReduceTask()
case Reduce:
c.MasterPhase = Exit
}
}
c.mutex.Unlock()
time.Sleep(time.Second)
}
}()

对于第二点,我们需要遍历中间生成文件,并创建相应数目的 Reduce 任务:

func (c *Coordinator) CreateReduceTask() {
c.TaskMeta = make(map[int]*MasterTask)
for idx, file := range c.Intermediates {
task := Task{
TaskPhase: Reduce,
nReduce: c.nReduce,
InputFile: "",
TaskNumber: idx,
Intermediates: file,
}
c.TaskQueue <- &task
c.TaskMeta[idx] = &MasterTask{
TaskStatus: Idle,
DeadLine: time.Time{},
TaskRef: &task,
MachineId: "",
}
}
}

注意,上述的两个 goroutine 都放在 MakeCoordinator 执行即可

至此,我们的 MapReduce 框架就已完成,打开终端输入

Terminal window
bash test-mr.sh

进行测试即可

实验结果