FXJ Wiki

Back

  1. 海量数据:需要处理 TB 级别的网页数据
  2. 复杂任务:构建搜索索引、分析网页链接、统计词频等
  3. 分布式困难:手动编写分布式程序太复杂,容易出错

传统方法的问题

任务:统计 1TB 网页中每个单词的出现次数

方法 1:单机处理
- 需要几天时间
- 内存不够,需要频繁读写磁盘
- 不可行

方法 2:手动分布式
- 需要手动分割数据
- 需要手动处理网络通信
- 需要手动处理机器故障
- 代码复杂,容易出错
plaintext

MapReduce 的解决方案

  • 程序员只需要写 Map 和 Reduce 函数
  • 框架自动处理并行化、容错、数据分发
  • 简单、高效、可靠

1.2 MapReduce 的核心思想#

抽象:把大部分数据处理任务抽象成两个操作

  1. Map:处理输入数据,产生中间键值对
  2. Reduce:合并相同 key 的所有值

为什么这个抽象有效?

很多任务都可以用这个模式表达:

  • 词频统计:Map 输出 (word, 1),Reduce 求和
  • 倒排索引:Map 输出 (word, docID),Reduce 收集所有 docID
  • 分布式排序:Map 输出 (key, value),Reduce 排序
  • 网页链接分析:Map 输出 (target, source),Reduce 统计入链

第二部分:MapReduce 编程模型#

2.1 基本概念#

输入:一组键值对 输出:一组键值对

Map 函数

map(k1, v1) → list(k2, v2)
plaintext
  • 输入:一个键值对 (k1, v1)
  • 输出:一组中间键值对 [(k2, v2), …]

Reduce 函数

reduce(k2, list(v2)) → list(v2)
plaintext
  • 输入:一个 key 和它的所有 value
  • 输出:合并后的结果

2.2 词频统计示例#

任务:统计文档中每个单词的出现次数

输入数据

doc1: "hello world"
doc2: "hello mapreduce"
doc3: "world peace"
plaintext

Map 函数

def map(document_name, document_content):
    # 输入:(doc1, "hello world")
    # 输出:[("hello", 1), ("world", 1)]
    for word in document_content.split():
        emit(word, 1)
python

Map 阶段输出

Map worker 1: [("hello", 1), ("world", 1)]
Map worker 2: [("hello", 1), ("mapreduce", 1)]
Map worker 3: [("world", 1), ("peace", 1)]
plaintext

Shuffle 阶段(框架自动完成):

"hello" → [1, 1]
"world" → [1, 1]
"mapreduce" → [1]
"peace" → [1]
plaintext

Reduce 函数

def reduce(word, counts):
    # 输入:("hello", [1, 1])
    # 输出:("hello", 2)
    emit(word, sum(counts))
python

最终输出

hello 2
world 2
mapreduce 1
peace 1
plaintext

2.3 更多示例#

示例 1:分布式 Grep

任务:在大量文件中查找包含特定模式的行

def map(filename, content):
    for line in content.split('\n'):
        if pattern in line:
            emit(line, 1)

def reduce(line, counts):
    emit(line, 1)  # 只输出一次
python

示例 2:URL 访问频率统计

任务:统计每个 URL 被访问的次数

def map(log_line, _):
    url = extract_url(log_line)
    emit(url, 1)

def reduce(url, counts):
    emit(url, sum(counts))
python

示例 3:倒排索引

任务:构建单词到文档的索引

def map(document_id, content):
    for word in content.split():
        emit(word, document_id)

def reduce(word, document_ids):
    emit(word, unique(document_ids))
python

第三部分:MapReduce 执行流程#

3.1 系统架构#

[Master]
   |
   |--- 分配任务 --->  [Worker 1] (Map)
   |                   [Worker 2] (Map)
   |                   [Worker 3] (Map)
   |
   |--- 分配任务 --->  [Worker 4] (Reduce)
   |                   [Worker 5] (Reduce)
plaintext

角色

  • Master:协调者,分配任务,监控进度
  • Worker:执行者,运行 Map 或 Reduce 任务

3.2 详细执行流程#

第 1 步:分割输入

输入文件:1GB
分割成:64 个 16MB 的块(split)
每个 split 对应一个 Map 任务
plaintext

第 2 步:启动 Master 和 Workers

Master:1 个
Workers:M 个(执行 Map)+ R 个(执行 Reduce)
plaintext

第 3 步:Map 阶段

1. Master 分配 Map 任务给空闲的 Worker
2. Worker 读取对应的 input split
3. Worker 调用用户的 Map 函数
4. Worker 把 Map 输出写入本地磁盘
5. Worker 把输出分成 R 份(根据 key 的 hash 值)
6. Worker 通知 Master 任务完成
plaintext

第 4 步:Shuffle 阶段

1. Master 通知 Reduce Worker 中间文件的位置
2. Reduce Worker 通过 RPC 读取中间文件
3. Reduce Worker 对中间数据按 key 排序
plaintext

第 5 步:Reduce 阶段

1. Master 分配 Reduce 任务给空闲的 Worker
2. Worker 读取所有对应的中间文件
3. Worker 对每个 key 调用用户的 Reduce 函数
4. Worker 把 Reduce 输出写入最终输出文件
5. Worker 通知 Master 任务完成
plaintext

第 6 步:完成

所有任务完成后,Master 唤醒用户程序
用户程序可以读取 R 个输出文件
plaintext

3.3 数据流图#

输入文件
   |
   v
[Split 0] [Split 1] [Split 2]
   |         |         |
   v         v         v
[Map 0]   [Map 1]   [Map 2]
   |         |         |
   +----+----+----+----+
        |         |
        v         v
   [中间文件 0-0] [中间文件 0-1]
   [中间文件 1-0] [中间文件 1-1]
   [中间文件 2-0] [中间文件 2-1]
        |         |
        v         v
   [Reduce 0] [Reduce 1]
        |         |
        v         v
   [输出 0]   [输出 1]
plaintext

第四部分:容错机制#

4.1 Worker 故障#

问题:Worker 可能在执行任务时崩溃

解决方案

  1. 心跳检测

    • Master 定期 ping Worker
    • 如果 Worker 没有响应,标记为失败
  2. 任务重新执行

    • 失败的 Map 任务:重新分配给其他 Worker
    • 失败的 Reduce 任务:重新分配给其他 Worker
  3. 已完成任务的处理

    • Map 任务:即使已完成,如果 Worker 失败,也需要重新执行
      • 原因:Map 输出存储在 Worker 的本地磁盘,Worker 失败后无法访问
    • Reduce 任务:已完成的不需要重新执行
      • 原因:Reduce 输出存储在全局文件系统(GFS),仍然可以访问

示例

时间线:
t1: Worker A 开始执行 Map 任务 1
t2: Worker A 完成 Map 任务 1,输出写入本地磁盘
t3: Worker A 崩溃
t4: Master 检测到 Worker A 失败
t5: Master 重新分配 Map 任务 1 给 Worker B
t6: Worker B 重新执行 Map 任务 1
plaintext

4.2 Master 故障#

问题:Master 崩溃怎么办?

论文中的解决方案

  • Master 定期写 checkpoint
  • Master 崩溃后,从最近的 checkpoint 恢复
  • 实际上,Google 的实现中 Master 很少崩溃,所以简单地让整个任务失败

为什么 Master 故障不是大问题?

  • Master 只有 1 个,故障概率低
  • Worker 有很多个,故障概率高
  • 重点优化 Worker 故障处理

4.3 语义保证#

问题:如果任务被重新执行,会不会产生重复的输出?

MapReduce 的保证

  1. 确定性函数:如果 Map 和 Reduce 函数是确定性的(相同输入产生相同输出),那么 MapReduce 保证输出与顺序执行相同

  2. 原子提交

    • Map 任务完成时,原子地重命名临时文件为最终文件
    • Reduce 任务完成时,原子地重命名临时文件为最终文件
    • 如果任务被重新执行,旧的输出会被覆盖
  3. 非确定性函数:如果函数是非确定性的,MapReduce 不保证输出一致性

    • 例如:Map 函数输出随机数
    • 这种情况下,程序员需要自己处理

第五部分:优化技术#

5.1 局部性优化#

问题:网络传输很慢,如何减少网络流量?

解决方案:数据局部性(Locality)

  • GFS 把文件分成 64MB 的块,每个块有 3 个副本
  • Master 尽量把 Map 任务分配给存储了输入数据的机器
  • 如果不行,分配给同一机架的机器

效果

  • 大部分输入数据从本地磁盘读取,不需要网络传输
  • 节省网络带宽,提高性能

5.2 任务粒度#

问题:应该有多少个 Map 和 Reduce 任务?

论文建议

  • M(Map 任务数)远大于 Worker 数
  • R(Reduce 任务数)是 Worker 数的小倍数

原因

  • M 很大:提高负载均衡,加快故障恢复
  • R 适中:每个 Reduce 产生一个输出文件,太多文件不方便

示例

Worker 数:2000
M:200,000(每个 Worker 平均 100 个 Map 任务)
R:5,000(每个 Worker 平均 2-3 个 Reduce 任务)
plaintext

5.3 备份任务(Backup Tasks)#

问题:有些 Worker 特别慢(“straggler”),拖慢整个任务

原因

  • 磁盘故障(读取速度慢)
  • CPU 被其他任务占用
  • 网络拥塞

解决方案

  • 当 MapReduce 接近完成时(例如 95% 的任务完成)
  • 对剩余的任务启动备份执行
  • 哪个先完成就用哪个的结果

效果

  • 论文中的例子:减少 44% 的执行时间

5.4 Combiner 函数#

问题:某些任务产生大量重复的中间数据

示例:词频统计

Map 输出:[("the", 1), ("the", 1), ("the", 1), ...]
网络传输:大量重复的 ("the", 1)
plaintext

解决方案:Combiner 函数

  • 在 Map Worker 本地先做一次合并
  • 减少网络传输
# Map 输出
[("the", 1), ("the", 1), ("the", 1)]

# Combiner 处理(在 Map Worker 本地)
[("the", 3)]

# 网络传输
只传输 ("the", 3),而不是 3 个 ("the", 1)
python

注意:Combiner 函数必须与 Reduce 函数相同(或兼容)

第六部分:性能分析#

6.1 论文中的实验#

实验环境

  • 1800 台机器
  • 每台机器:2 个 2GHz CPU,4GB 内存,2 个 160GB 磁盘
  • 网络:千兆以太网

实验 1:Grep

  • 任务:在 1TB 数据中查找特定模式
  • 输入:1TB(10^10 条记录)
  • 输出:约 90,000 条匹配记录
  • 时间:150 秒
  • 吞吐量:约 6.7 GB/s

实验 2:排序

  • 任务:对 1TB 数据排序
  • 输入:1TB(10^10 条记录)
  • 输出:1TB(排序后)
  • 时间:891 秒
  • 吞吐量:约 1.1 GB/s(比 Grep 慢,因为需要写入大量数据)

6.2 性能瓶颈#

网络带宽

  • 输入数据从 GFS 读取
  • 中间数据通过网络传输
  • 输出数据写入 GFS
  • 网络是主要瓶颈

磁盘 I/O

  • Map 输出写入本地磁盘
  • Reduce 读取中间文件
  • 磁盘 I/O 也是瓶颈

CPU

  • 对于简单任务(如 Grep),CPU 不是瓶颈
  • 对于复杂任务(如图像处理),CPU 可能是瓶颈

6.3 扩展性#

问题:增加机器数量,性能能提升多少?

实验结果

  • 200 台机器:1746 秒
  • 400 台机器:891 秒
  • 800 台机器:???(论文未给出)

观察

  • 机器数量翻倍,时间减半(接近线性扩展)
  • 说明 MapReduce 的并行化很有效

第七部分:MapReduce 的应用#

7.1 Google 内部的应用#

搜索索引

  • 构建倒排索引
  • 处理数十亿网页
  • 每月运行多次

网页图分析

  • 计算 PageRank
  • 分析网页链接结构

日志分析

  • 分析用户行为
  • 检测异常流量

机器学习

  • 训练大规模模型
  • 处理训练数据

7.2 MapReduce 的局限性#

不适合的场景

  1. 需要低延迟的任务

    • MapReduce 启动慢(需要分配资源)
    • 适合批处理,不适合实时查询
  2. 需要迭代的任务

    • 每次迭代都需要重新启动 MapReduce
    • 中间结果需要写入磁盘
    • 效率低
  3. 需要随机访问的任务

    • MapReduce 是顺序处理
    • 不支持随机读写
  4. 需要复杂通信的任务

    • Worker 之间不能直接通信
    • 只能通过 Map → Shuffle → Reduce

替代方案

  • 实时查询:Dremel, BigQuery
  • 迭代计算:Spark
  • 图计算:Pregel
  • 流处理:MillWheel, Dataflow

第八部分:MapReduce vs Lab 1#

8.1 相同点#

核心概念

  • Map 和 Reduce 抽象
  • Master-Worker 架构
  • 任务分配和监控
  • 容错机制(超时重试)

数据流

  • 输入 → Map → 中间文件 → Reduce → 输出
  • 中间文件按 key 分区

8.2 不同点#

1. 存储系统

  • 论文:使用 GFS(分布式文件系统)
  • Lab 1:使用本地文件系统

2. 网络通信

  • 论文:Worker 之间通过 RPC 传输中间数据
  • Lab 1:通过共享文件系统传输

3. 容错机制

  • 论文:心跳检测 + 任务重新执行
  • Lab 1:超时检测 + 任务重新执行

4. 规模

  • 论文:数千台机器,TB 级数据
  • Lab 1:单机模拟,MB 级数据

8.3 Lab 1 的简化#

简化 1:单机模拟

  • 所有 Worker 运行在同一台机器上
  • 通过进程模拟分布式环境

简化 2:共享文件系统

  • 不需要实现网络传输
  • 直接读写本地文件

简化 3:简化的容错

  • 只处理 Worker 故障
  • 不处理 Master 故障

简化 4:固定的任务数

  • Map 任务数 = 输入文件数
  • Reduce 任务数 = nReduce 参数

第九部分:关键设计决策#

9.1 为什么 Map 输出写入本地磁盘?#

问题:为什么不直接通过网络发送给 Reduce Worker?

原因

  1. 容错:如果 Reduce Worker 失败,Map 输出仍然保存在磁盘上
  2. 解耦:Map 和 Reduce 可以独立执行
  3. 负载均衡:可以等所有 Map 完成后再分配 Reduce 任务

9.2 为什么需要 Shuffle 阶段?#

问题:为什么不让 Map Worker 直接把数据发送给 Reduce Worker?

原因

  1. 聚合:相同 key 的数据可能来自不同的 Map Worker
  2. 排序:Reduce 需要按 key 顺序处理
  3. 效率:批量传输比逐个传输更高效

9.3 为什么 Reduce 输出写入 GFS?#

问题:为什么不写入本地磁盘?

原因

  1. 持久性:GFS 有多个副本,不会因为机器故障丢失
  2. 可访问性:用户程序可以从任何机器访问输出
  3. 容错:如果 Reduce Worker 失败,已完成的输出不需要重新计算

9.4 为什么需要原子提交?#

问题:为什么使用临时文件 + 重命名?

原因

  1. 一致性:避免部分写入的文件被读取
  2. 容错:如果 Worker 崩溃,不会产生损坏的文件
  3. 幂等性:任务重新执行时,旧的输出会被覆盖

第十部分:论文的影响#

10.1 开源实现#

Hadoop

  • Apache 开源项目
  • Java 实现的 MapReduce
  • 包含 HDFS(类似 GFS)
  • 广泛应用于工业界

其他实现

  • Disco(Python)
  • Phoenix(C++)
  • Mars(GPU)

10.2 后续发展#

Spark

  • 内存计算,比 MapReduce 快 10-100 倍
  • 支持迭代计算
  • 支持交互式查询

Flink

  • 流处理框架
  • 支持事件时间处理
  • 支持状态管理

Beam

  • 统一的编程模型
  • 支持批处理和流处理
  • 可以运行在多个执行引擎上

10.3 MapReduce 的遗产#

核心思想

  • 简单的编程模型
  • 自动并行化
  • 容错机制
  • 数据局部性

影响

  • 推动了大数据处理的发展
  • 启发了后续的分布式计算框架
  • 成为分布式系统的经典案例

总结#

MapReduce 论文的核心贡献:

  1. 简单的编程模型:Map 和 Reduce 两个函数就能表达大部分数据处理任务
  2. 自动并行化:框架自动处理并行执行,程序员不需要关心细节
  3. 容错机制:自动处理机器故障,保证任务最终完成
  4. 可扩展性:可以扩展到数千台机器,处理 TB 级数据

学习要点

  • 理解 Map 和 Reduce 的抽象
  • 理解数据流和执行流程
  • 理解容错机制(任务重新执行)
  • 理解优化技术(局部性、备份任务、Combiner)

与 Lab 1 的联系

  • Lab 1 是 MapReduce 的简化实现
  • 核心概念相同,但规模和复杂度降低
  • 通过实现 Lab 1,你将深刻理解 MapReduce 的工作原理

阅读完这份指南后,建议:

  1. 阅读原始论文,加深理解
  2. 实现 Lab 1,动手实践
  3. 思考 MapReduce 的局限性和改进方向