Hadoop MapReduce 实战指南
前面进行了Hadoop配置安装
以及了解了HDFS
接下来详细介绍MapReduce,这是Hadoop的核心计算框架,用于处理大规模数据集的分布式计算。
1. MapReduce 基本概念
MapReduce是一种编程模型,用于大规模数据集的并行处理。它的核心思想是将计算过程分为两个阶段:
- Map(映射):将输入数据分解成若干个键值对,并行处理每个键值对
- Reduce(归约):将Map阶段的结果按键合并,进行汇总计算
2. 现代MapReduce编程示例
2.1 经典WordCount示例(Hadoop 3.x)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException; import java.util.StringTokenizer;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1); private Text word = new Text();
public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
|
2.2 编译和运行
1 2 3 4 5 6
| javac -cp "$(hadoop classpath)" WordCount.java jar cf wc.jar WordCount*.class
hadoop jar wc.jar WordCount /input /output
|
3. MapReduce 工作原理
3.1 执行流程
- 输入分片(Input Split):将输入数据分割成多个分片
- Map阶段:每个分片由一个Mapper处理,生成中间键值对
- Shuffle阶段:将中间结果按键分组,发送到对应的Reducer
- Reduce阶段:每个Reducer处理一组键值对,生成最终结果
- 输出:将结果写入HDFS
3.2 核心组件
| 组件 |
职责 |
示例实现 |
| InputFormat |
将输入数据分割成InputSplit |
TextInputFormat, SequenceFileInputFormat |
| Mapper |
处理输入数据,生成中间键值对 |
TokenizerMapper |
| Combiner |
局部合并中间结果,减少网络传输 |
IntSumReducer |
| Partitioner |
将中间结果分配给不同的Reducer |
HashPartitioner |
| Reducer |
合并中间结果,生成最终输出 |
IntSumReducer |
| OutputFormat |
将结果写入HDFS |
TextOutputFormat, SequenceFileOutputFormat |
4. 高级MapReduce编程
4.1 使用自定义数据类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import org.apache.hadoop.io.Writable;
public class CustomWritable implements Writable { private int count; private double average; @Override public void write(DataOutput out) throws IOException { out.writeInt(count); out.writeDouble(average); } @Override public void readFields(DataInput in) throws IOException { count = in.readInt(); average = in.readDouble(); } }
|
1 2 3 4 5 6
| import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, Mapper1.class); MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, Mapper2.class);
|
4.3 使用计数器
1 2 3 4 5
| context.getCounter("Custom Counters", "Processed Records").increment(1);
hadoop job -counter <job-id> "Custom Counters" "Processed Records"
|
5. MapReduce 最佳实践
5.1 性能优化
- 合理设置分片大小:默认128MB,可根据数据特性调整
- 使用Combiner:减少网络传输数据量
- 合理设置Reducer数量:一般为节点数的1-2倍
- 使用压缩:减少数据传输和存储
- 避免数据倾斜:使用自定义Partitioner
- 优化数据序列化:使用WritableComparable
5.2 常见问题及解决方案
| 问题 |
原因 |
解决方案 |
| 数据倾斜 |
某些键出现频率过高 |
1. 使用自定义Partitioner 2. 对倾斜键进行特殊处理 3. 增加Reducer数量 |
| OOM错误 |
内存不足 |
1. 增加JVM内存 2. 优化数据结构 3. 减少单个任务处理的数据量 |
| 任务失败 |
网络或硬件问题 |
1. 增加任务重试次数 2. 优化代码健壮性 3. 检查输入数据格式 |
6. MapReduce 与现代大数据技术
6.1 MapReduce vs Spark
| 特性 |
MapReduce |
Spark |
| 处理模型 |
批处理 |
批处理 + 流处理 |
| 中间结果 |
写入磁盘 |
内存中缓存 |
| 执行速度 |
较慢 |
快10-100倍 |
| 易用性 |
较低 |
高(支持多种语言) |
| 生态系统 |
成熟 |
丰富 |
6.2 MapReduce 的应用场景
- 大规模批处理:日志分析、数据ETL
- 机器学习:训练模型、特征提取
- 数据挖掘:模式识别、关联分析
- 科学计算:基因测序、气象分析
7. 实际应用示例
7.1 日志分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class LogAnalyzer { public static class LogMapper extends Mapper<Object, Text, Text, IntWritable> { private Text ip = new Text(); private final static IntWritable one = new IntWritable(1); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String ipAddress = parseIpAddress(line); if (ipAddress != null) { ip.set(ipAddress); context.write(ip, one); } } private String parseIpAddress(String line) { return "192.168.1.1"; } } }
|
7.2 数据聚合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class DataAggregator { public static class AggregateMapper extends Mapper<Object, Text, Text, DoubleWritable> { private Text category = new Text(); private DoubleWritable value = new DoubleWritable(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] parts = line.split(","); if (parts.length >= 2) { category.set(parts[0]); value.set(Double.parseDouble(parts[1])); context.write(category, value); } } } public static class AggregateReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { private DoubleWritable result = new DoubleWritable(); @Override protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { double sum = 0; for (DoubleWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } }
|
8. MapReduce 工具和生态系统
8.1 常用工具
- Hadoop Streaming:使用任意语言编写MapReduce作业
- Hadoop Pipes:使用C++编写MapReduce作业
- Oozie:工作流调度系统
- Hive:SQL接口,将SQL转换为MapReduce作业
- Pig:数据流语言,将脚本转换为MapReduce作业
8.2 监控和调试
- Hadoop Web UI:查看作业状态和计数器
- YARN ResourceManager:资源管理和作业调度
- 日志分析:查看作业日志定位问题
- 计数器:监控作业执行情况
9. 参考资料
参考资料:
http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop2/
http://www.infoq.com/cn/articles/MapReduce-Best-Practice-1
http://hadoop.apache.org/docs/r0.19.1/cn/mapred_tutorial.html#%E7%9B%AE%E7%9A%84
http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html
http://blog.csdn.net/kauu/article/details/1815353
http://sishuok.com/forum/blogPost/list/0/5965.html