Hadoop MapReduce 实战指南

前面进行了Hadoop配置安装
以及了解了HDFS

接下来详细介绍MapReduce,这是Hadoop的核心计算框架,用于处理大规模数据集的分布式计算。

1. MapReduce 基本概念

MapReduce是一种编程模型,用于大规模数据集的并行处理。它的核心思想是将计算过程分为两个阶段:

  • Map(映射):将输入数据分解成若干个键值对,并行处理每个键值对
  • Reduce(归约):将Map阶段的结果按键合并,进行汇总计算

2. 现代MapReduce编程示例

2.1 经典WordCount示例(Hadoop 3.x)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.StringTokenizer;

public class WordCount {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

2.2 编译和运行

1
2
3
4
5
6
# 编译
javac -cp "$(hadoop classpath)" WordCount.java
jar cf wc.jar WordCount*.class

# 运行
hadoop jar wc.jar WordCount /input /output

3. MapReduce 工作原理

3.1 执行流程

  1. 输入分片(Input Split):将输入数据分割成多个分片
  2. Map阶段:每个分片由一个Mapper处理,生成中间键值对
  3. Shuffle阶段:将中间结果按键分组,发送到对应的Reducer
  4. Reduce阶段:每个Reducer处理一组键值对,生成最终结果
  5. 输出:将结果写入HDFS

3.2 核心组件

组件 职责 示例实现
InputFormat 将输入数据分割成InputSplit TextInputFormat, SequenceFileInputFormat
Mapper 处理输入数据,生成中间键值对 TokenizerMapper
Combiner 局部合并中间结果,减少网络传输 IntSumReducer
Partitioner 将中间结果分配给不同的Reducer HashPartitioner
Reducer 合并中间结果,生成最终输出 IntSumReducer
OutputFormat 将结果写入HDFS TextOutputFormat, SequenceFileOutputFormat

4. 高级MapReduce编程

4.1 使用自定义数据类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.hadoop.io.Writable;

public class CustomWritable implements Writable {
private int count;
private double average;

// 构造函数、getter、setter方法

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(count);
out.writeDouble(average);
}

@Override
public void readFields(DataInput in) throws IOException {
count = in.readInt();
average = in.readDouble();
}
}

4.2 使用MultipleInputs

1
2
3
4
5
6
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

// 为不同的输入路径指定不同的Mapper
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, Mapper1.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, Mapper2.class);

4.3 使用计数器

1
2
3
4
5
// 在Mapper或Reducer中
context.getCounter("Custom Counters", "Processed Records").increment(1);

// 作业完成后查看计数器
hadoop job -counter <job-id> "Custom Counters" "Processed Records"

5. MapReduce 最佳实践

5.1 性能优化

  1. 合理设置分片大小:默认128MB,可根据数据特性调整
  2. 使用Combiner:减少网络传输数据量
  3. 合理设置Reducer数量:一般为节点数的1-2倍
  4. 使用压缩:减少数据传输和存储
  5. 避免数据倾斜:使用自定义Partitioner
  6. 优化数据序列化:使用WritableComparable

5.2 常见问题及解决方案

问题 原因 解决方案
数据倾斜 某些键出现频率过高 1. 使用自定义Partitioner
2. 对倾斜键进行特殊处理
3. 增加Reducer数量
OOM错误 内存不足 1. 增加JVM内存
2. 优化数据结构
3. 减少单个任务处理的数据量
任务失败 网络或硬件问题 1. 增加任务重试次数
2. 优化代码健壮性
3. 检查输入数据格式

6. MapReduce 与现代大数据技术

6.1 MapReduce vs Spark

特性 MapReduce Spark
处理模型 批处理 批处理 + 流处理
中间结果 写入磁盘 内存中缓存
执行速度 较慢 快10-100倍
易用性 较低 高(支持多种语言)
生态系统 成熟 丰富

6.2 MapReduce 的应用场景

  • 大规模批处理:日志分析、数据ETL
  • 机器学习:训练模型、特征提取
  • 数据挖掘:模式识别、关联分析
  • 科学计算:基因测序、气象分析

7. 实际应用示例

7.1 日志分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class LogAnalyzer {
public static class LogMapper extends Mapper<Object, Text, Text, IntWritable> {
private Text ip = new Text();
private final static IntWritable one = new IntWritable(1);

@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
// 解析日志行,提取IP地址
String ipAddress = parseIpAddress(line);
if (ipAddress != null) {
ip.set(ipAddress);
context.write(ip, one);
}
}

private String parseIpAddress(String line) {
// 解析IP地址的逻辑
return "192.168.1.1"; // 示例
}
}

// Reducer和main方法类似WordCount
}

7.2 数据聚合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class DataAggregator {
public static class AggregateMapper extends Mapper<Object, Text, Text, DoubleWritable> {
private Text category = new Text();
private DoubleWritable value = new DoubleWritable();

@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] parts = line.split(",");
if (parts.length >= 2) {
category.set(parts[0]);
value.set(Double.parseDouble(parts[1]));
context.write(category, value);
}
}
}

public static class AggregateReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
private DoubleWritable result = new DoubleWritable();

@Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
double sum = 0;
for (DoubleWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

// main方法类似WordCount
}

8. MapReduce 工具和生态系统

8.1 常用工具

  • Hadoop Streaming:使用任意语言编写MapReduce作业
  • Hadoop Pipes:使用C++编写MapReduce作业
  • Oozie:工作流调度系统
  • Hive:SQL接口,将SQL转换为MapReduce作业
  • Pig:数据流语言,将脚本转换为MapReduce作业

8.2 监控和调试

  • Hadoop Web UI:查看作业状态和计数器
  • YARN ResourceManager:资源管理和作业调度
  • 日志分析:查看作业日志定位问题
  • 计数器:监控作业执行情况

9. 参考资料


参考资料:

http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop2/

http://www.infoq.com/cn/articles/MapReduce-Best-Practice-1

http://hadoop.apache.org/docs/r0.19.1/cn/mapred_tutorial.html#%E7%9B%AE%E7%9A%84

http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html

http://blog.csdn.net/kauu/article/details/1815353

http://sishuok.com/forum/blogPost/list/0/5965.html