FXJ Wiki

Back

你需要实现的两个文件#

Lab 1 要求你实现:

  • labs/src/mr/master.go:Master(协调者)
  • labs/src/mr/worker.go:Worker(执行者)

框架代码已经给了你:

  • labs/src/mr/rpc.go:RPC 消息定义(你需要在这里添加自己的消息类型)
  • labs/src/labrpc/:RPC 框架(不要改)
  • labs/src/main/mrmaster.go:启动 Master 的入口(不要改)
  • labs/src/main/mrworker.go:启动 Worker 的入口(不要改)

系统的执行流程#

1. 启动 Master
   Master 读取输入文件列表,创建 Map 任务

2. 启动多个 Worker
   每个 Worker 循环:
     → 向 Master 请求任务(RPC)
     → 执行任务(Map 或 Reduce)
     → 向 Master 报告完成(RPC)

3. Map 阶段
   每个 Map 任务处理一个输入文件
   输出中间文件:mr-X-Y(X=Map任务号,Y=Reduce分区号)

4. 所有 Map 完成后,进入 Reduce 阶段
   每个 Reduce 任务处理所有 mr-*-Y 文件(Y相同)
   输出最终文件:mr-out-Y

5. 所有 Reduce 完成后,Master 告诉 Worker 退出
plaintext

中间文件的命名规则#

Map 任务 X 的输出要分成 NReduce 个文件:

mr-0-0, mr-0-1, mr-0-2, ...  (Map 任务 0 的输出)
mr-1-0, mr-1-1, mr-1-2, ...  (Map 任务 1 的输出)
plaintext

ihash(key) % NReduce 决定一个 key 属于哪个 Reduce 分区:

func ihash(key string) int {
    h := fnv.New32a()
    h.Write([]byte(key))
    return int(h.Sum32() & 0x7fffffff)
}

// 决定 key 属于哪个 Reduce 任务
reduceTaskNum := ihash(key) % nReduce
go

Master 需要维护的状态#

type Master struct {
    mu          sync.Mutex
    mapTasks    []MapTask     // Map 任务列表
    reduceTasks []ReduceTask  // Reduce 任务列表
    nReduce     int           // Reduce 任务数量
    phase       string        // "map" 或 "reduce" 或 "done"
}

type MapTask struct {
    ID        int
    InputFile string
    Status    string    // "idle", "running", "done"
    StartTime time.Time
}

type ReduceTask struct {
    ID        int
    Status    string
    StartTime time.Time
}
go

RPC 消息设计#

你需要在 rpc.go 里添加:

// Worker 请求任务
type GetTaskArgs struct{}

type GetTaskReply struct {
    TaskType  string  // "map", "reduce", "wait", "exit"
    TaskID    int
    NReduce   int
    NMap      int     // Reduce 任务需要知道有多少个 Map 任务
    InputFile string  // Map 任务的输入文件
}

// Worker 报告任务完成
type ReportTaskArgs struct {
    TaskType string
    TaskID   int
}

type ReportTaskReply struct{}
go

测试脚本做了什么#

bash test-mr.sh 会依次运行:

  1. wc 测试:词频统计,验证基本正确性
  2. indexer 测试:倒排索引,验证复杂 key/value 处理
  3. map parallelism 测试:验证 Map 任务是并行执行的
  4. reduce parallelism 测试:验证 Reduce 任务是并行执行的
  5. crash 测试:Worker 随机崩溃,验证容错性

每个测试都会先用 mrsequential(单机顺序执行)生成正确答案,然后用你的分布式实现跑,最后对比结果。


实现顺序建议#

  1. 先实现 RPC 消息定义(rpc.go
  2. 实现 Master 的任务分配逻辑(master.go
  3. 实现 Worker 的 Map 执行逻辑(worker.go
  4. 测试 wc 测试通过
  5. 实现 Worker 的 Reduce 执行逻辑
  6. 测试 wc 和 indexer 通过
  7. 实现 Master 的超时检测(10 秒超时,重新分配任务)
  8. 测试 crash 测试通过

不要一次性写完所有代码再测试,每实现一个功能就测试一次。