实验前请安装 Go
环境,编辑器我采用 VS Code
我的实验环境为 Manjaro Linux
,因此就不用虚拟机了
并且由于官网的不公开协议,我不会上传代码到 github
上,博客也不会挂在搜索引擎上
实验要求
使用 Go
实现一个简单的 MapReduce
框架,实现的功能为简单的 Word Count
MapReduce
框架简介
这个简单版本的 MapReduce
省去了原文中的一些步骤(例如切分文件等),所以这里解释我们需要实现的 MapReduce
框架是什么样的。
由于我们是使用单机器多进程来模拟多机器的,所以下面我都以进程来描述原框架中的各个机器
其原理十分简单,从宏观上而言,一个MapReduce
框架由一个 master
进程(这里应该叫 coordinator
?但字母太多了,统一用 matser
说吧)和多个 Worker
进程组成。
master
扮演着服务器的角色,其追踪着各个 Task
的状态与阶段状态,为 Worker
分配 Task
并处理 Worker
已完成的任务(如果需要处理的话)
Worker
扮演着驴的角色,只负责做从 master
那要来的任务,做完就一直问 master
要新的任务,并把完成的任务上报给 master
框架各个部分的设计
官网上给出了很多的要求,直接看可能会眼花缭乱不知道应该如何下手(说的就是我),所以这里对照要求一步一步的给出设计思路
Task
首先我们明确这个 Worker
处理的最小单位,任何一个 Map
/ Reduce
任务都可以视为一个 Task
, 一个完整的 MapReduce
过程被称为一个 Job
(但在这里我们不会考虑 Job
这个概念,因为都是只做一次 MapReduce
过程)
对于一次 Task
,事实上这是 Worker
通过 RPC
从 master
中拿到的关于任务的一切信息,所以我们必须设计一些冗余项来保证信息是完整的。
我们可以分步来进行设计,首先,显然我们设计如下:
这样,对于 Worker
而言,我们就知道了
- 当前应该执行的是
Map
任务还是 Reduce
任务
- 任务的输入文件
但考虑我们的任务为 word count
, 我们的 Map
任务需要给出中间生成的文件,并且中间生成文件的命名为 mr-x-y
,其中 x
为这是第几个 Map
任务(也就是这是第几个输入文件),y
为相同键值所在的 Reduce
任务的编号,例如 y = ihash(key) % nReduce
意思为键为 Key
的Kv
应当让索引为 y
的 Reduce
来处理
在 work.go
中已给出了 ihash()
的实现,所以我们还需要的是 nReduce
的值,并且我们还需要一个数组来存储中间生成文件的文件名(因为是本地所以直接存文件名就好) ,因此我们二次设计如下:
这样我们做完了最初版本的设计,这样的 Task
就可以在 Worker
与 master
中传输了
Master
master
需要有以下功能:
-
在启动时根据指定的输入文件数及 Reduce Task
数,生成 Map Task
及 Reduce Task
-
响应 Worker
的 Task
申请 RPC
请求,分配可用的 Task
给到 Worker
处理
-
追踪 Task
的完成情况,在所有 Map Task
完成后进入 Reduce
阶段,开始派发 Reduce Task
;在所有 Reduce Task
完成后标记作业已完成并退出
由于我们需要对 Task
的完成情况进行追踪,并且需要给 Worker
任务,那么这里我们可以通过 map
与 chan
来实现这两个功能
首先,我们设计对 Task
任务的追踪:
这里对 MasterTask
的设计借鉴了原论文中的设计,包括 TaskStatus
与 MachineId
这样,我们就可以完成对 Task
的追踪,接下来就可以对 Master
的结构进行设计了:
mutex
为互斥锁,用于并发控制
nReduce
用以传送给每一个 Task
- 消息队列
TaskQueue
用以为 Worker
发送 Task
,由于 chan
自带锁,这样就避免了我们自己手写一个带锁的队列(
TaskMeta
为一个 map
,其键值为 index
,实际上就是文件的序号,当然在这里可以当作 Task
的索引,值为 MasterTask
,注意这个结构体实际上是包含了 Task
的
MasterPhase
记录了当前 MapReduce
任务已经进行到了什么阶段
InputFiles
记录了 Map
任务的输入文件
Intermediates
为一个文件列表,由于在前面我们提到,Map
任务会生成一系列中间文件 mr-X-Y
,其中 Y
为 Reduce
的编号,所以在这里,我们需要把中间文件存入这里,Intermediates[Y] = mr-i-Y
其中 0≤i<mapTask
我们在这里添加了 TaskMeta
之后,显然我们在 Task
中还需要一个值来记录,我们当前的 Task
在 Master
中所对应的元数据应该是什么,所以我们设计 Task
如下所示:
(我发誓这是最终版本了)
功能实现
首先最简单的功能,应该是 RPC
中 master
与 worker
之间的任务派发,我们从这一步开始写,但首先,我们需要明确在 master
初始化时需要完成的事情:
由于 RPC
的存在,因此我们需要设定请求参数与返回参数。
在这里,由于我们跟踪 Task
的完成情况,所以这就要求 Worker
需要向 Master
报告自己是否完成了上一个任务,Master
又会向 Worker
发送新的任务,因此这两步我们可以合成一个部分来看待。
于是我们可以设计请求参数与返回参数如下:
当然最后一行可以不写,直接将 reply
设置为 *Task
类型即可,一个简单的框架如下所示:
注意这里,我们将 DeadLine
设置为 10s
,这是官网的 Hint
中建议的,至于其用处,在后面其他功能的实现部分会解释
下面解释一下为什么要用 select
,在最开始的版本中,我没有使用 select
,是直接 task, ok := <- c.TaskQueue
,但遇到了代码执行完 Map
阶段后卡住不动,debug
后发现是因为在 TaskQueue
这个 chan
中的内容被取完后,进入了阻塞状态,只有此 chan
中有新任务添加才会被唤起。经过 GPT4
的解答后,采用了 select
进行阻塞的处理,如果阻塞,那么我们将 reply
设置为 WAIT
或 EXIT
来告诉 Worker
应该做什么样的反应
而对于提交任务的部分,我们需要考虑几个条件:
- 由于
Worker
获取到 Task
后可能出现宕机和卡死等情况,在这种情况下, Master
会将任务给其他 Worker
处理,发生故障的 Worker
所做的处理会作废
- 也是由于卡死,所以可能会出现提交的任务阶段滞后的情况,例如已经到了
Reduce
阶段,但此时提交上来的任务却还是 Map
的任务,类似这种我们也应该作废
所以,我们对任务判断应该写为:
如果检测通过,那么我们接受这次任务的提交,并将中间文件存在 Master
上(虽然这里做的处理只是将其路径存在 Master
中)
那么在 Worker
中,我们的结构可以这样:
接下来,我们来实现 Worker
中的 map
与 reduce
,首先是 map
阶段 注意定义 KeyValue
结构体
大部分都是从 mrsequential.go
中照搬来的代码,所以也没有特别多的解释(
比较值得注意的是,这里我们按照要求,模拟了一个 json
的数据传送
而对于 reduce
任务:
同样也是照抄 mrsequential.go
的代码(
于是在 worker
这边,我们的任务就已全部做完了。
而我们还有其他要求,即 Worker
长时间不与 master
通信了,为了简化任务,设定该超时阈值为 10s
即可。为了支持这一点,我们的实现需要支持到:
master
追踪已分配 Task
的运行情况,在 Task
超出 10s
仍未完成时,将该 Task
重新分配给其他 Worker
重试
- 考虑
Task
上一次分配的 Worker
可能仍在运行,重新分配后会出现两个 Worker
同时运行同一个 Task
的情况。要确保只有一个 Worker
能够完成结果数据的最终写出,以免出现冲突,导致下游观察到重复或缺失的结果数据
第二点会相对复杂些,不过在 Lab 文档中也给出了提示 —— 实际上也是参考了 Google MapReduce 的做法,Worker
在写出数据时可以先写出到临时文件,最终确认没有问题后再将其重命名为正式结果文件,区分开了 Write 和 Commit 的过程。Commit 的过程可以是 Master 来执行,也可以是 Worker 来执行:
- Master Commit:Worker 向 Master 汇报 Task 完成,Master 确认该 Task 是否仍属于该 Worker,是则进行结果文件 Commit,否则直接忽略
- Worker Commit:Worker 向 Master 汇报 Task 完成,Master 确认该 Task 是否仍属于该 Worker 并响应 Worker,是则 Worker 进行结果文件 Commit,再向 Master 汇报 Commit 完成
但实际上这里,我们并不需要区分的如此明确,因为 Worker
会将文件上传到 master
的磁盘中,所以我们可以直接让 worker
进行重命名,也就是我们上面所做的那样。
而对于第一点,我们采用 goroutine
来解决(实际上有点像自动垃圾回收?
我们需要考虑的点比较多:
- 该任务是否被分配
- 是否超过时限(
DDL
)
- 此任务当前的阶段是否与服务器的阶段不匹配(也就是是否为过期任务)
- 任务是否为
InProcess
如果都满足,那么我们将此任务重新加回队列中
我们还差最后一步尚未完成,也就是我们的服务器无法进行阶段转移:从 Map
到 Reduce
, 从 Reduce
到 Exit
并且我们也没有为 Reduce
阶段进行任务的创建
对于第一个,我们可以采用 goroutine
进行周期性的检测:
对于第二点,我们需要遍历中间生成文件,并创建相应数目的 Reduce
任务:
注意,上述的两个 goroutine
都放在 MakeCoordinator
执行即可
至此,我们的 MapReduce
框架就已完成,打开终端输入
进行测试即可
实验结果