MapReducecore concepts and programmingmodel

深入LearningMapReduce core concepts, architecturedesign, working principles and programmingmodel, MasterMapReduce Development and application

MapReducecore concepts and programmingmodel

1. MapReduceIntroduction

MapReduce is adistributed计算model, 用于processinglarge-scaledata集. 它 core思想 is 将计算task分解 for 两个阶段: Map (map) and Reduce (归约) , 这两个阶段可以 in clusterin many 个node on parallel执行.

1.1 MapReduce 起sources

MapReduce concepts来sources于function式programminglanguagein map and reducefunction. 2004年, Google发表了《MapReduce: Simplified Data Processing on Large Clusters》论文, 介绍了MapReduce design思想 and implementation细节. 随 after , Doug Cuttingetc.人基于这篇论文implementation了Hadoop MapReduce.

1.2 MapReduce design目标

  • simple 易用: providing simple programmingmodel, 让Development者able to专注于业务逻辑, 而不需要关心distributed计算 细节
  • high reliability: throughfault tolerancemechanism, 确保 in nodefailure时taskable to继续执行
  • high scale性: 可以through添加node来improvingprocessingcapacity
  • high throughput: 适合processinglarge-scaledata集 批量processing

2. MapReducecore concepts

2.1 Map阶段

Map阶段 is MapReduce 第一个阶段, 它 主要functions is 将输入data转换 for in间键值 for . in Map阶段, 每个Maptaskprocessing输入data 一个shard (Split) , 并生成一系列in间键值 for .

2.2 Reduce阶段

Reduce阶段 is MapReduce 第二个阶段, 它 主要functions is 将Map阶段生成 in间键值 for formerge and 汇总. in Reduce阶段, 每个Reducetaskprocessing具 has 相同键 所 has in间值, 并生成最终 输出结果.

2.3 键值 for (Key-Value Pair)

MapReduce 计算过程基于键值 for , 输入 and 输出都 is 键值 for 形式.

  • 输入键值 for : 输入data被分割成一系列键值 for , serving asMapfunction 输入
  • in间键值 for : Mapfunction 输出 is in间键值 for , 这些键值 for 会被传递给Reducefunction
  • 输出键值 for : Reducefunction 输出 is 最终 键值 for , serving as整个MapReducejob 结果

2.4 shard (Split)

shard is MapReducein输入data basic单位. in HDFSin, file被分割成固定 big small data块 (默认128MB) , 而 in MapReducein, 输入data被分割成shard, 每个shard for 应一个Maptask.

2.5 partition (Partition)

partition is 将in间键值 for 分配给不同Reducetask 过程. 默认circumstances under , MapReduceusing哈希function for in间键forpartition, 确保具 has 相同键 in间值被分配给同一个Reducetask.

2.6 sort (Sort)

in Map阶段 and Reduce阶段之间, MapReduce会 for in间键值 for forsort, 确保具 has 相同键 in间值被group in 一起.

2.7 merge (Combine)

merge is in Map阶段之 after , Reduce阶段之 before for in间键值 for for局部汇总 过程. merge可以reducingnetwork传输 data量, improvingsystemperformance.

3. MapReducearchitecturedesign

MapReduceadoptsmaster-slave (Master-Slave) architecture, 主要package含以 under component:

3.1 JobTracker

JobTracker is MapReduce 主node, 负责management整个job 执行过程. 它 主要functionsincluding:

  • 接收客户端submitting job
  • 将job分解 for many 个Map and Reducetask
  • 将task分配给TaskTracker执行
  • monitortask 执行status
  • processingtask失败 circumstances

3.2 TaskTracker

TaskTracker is MapReduce from node, 负责执行具体 Map or Reducetask. 它 主要functionsincluding:

  • 接收JobTracker分配 task
  • 执行Map or Reducetask
  • 向JobTracker报告task 执行status
  • processingtask失败 circumstances

Notes

in Hadoop 2.xversionin, MapReduce architecture发生了变化, JobTracker and TaskTracker被YARN (Yet Another Resource Negotiator) 取代. YARN负责resourcemanagement and jobscheduling, 而MapReduce仅负责计算.

4. MapReduceworkflow程

MapReducejob 执行过程可以分 for 以 under 几个步骤:

  1. jobsubmitting: 客户端向JobTrackersubmittingMapReducejob
  2. job初始化: JobTracker将job分解 for many 个Map and Reducetask
  3. task分配: JobTracker将task分配给空闲 TaskTracker执行
  4. task执行: TaskTracker执行Map or Reducetask
  5. in间dataprocessing: Maptask生成in间键值 for , 经过partition, sort and merge after , 传递给Reducetask
  6. 结果输出: Reducetask生成最终结果, 并将结果写入HDFS
  7. jobcompletion: 当所 has task执行completion after , JobTracker向客户端返回jobcompletion message

5. MapReduceprogrammingmodel

MapReduceproviding了 simple programmingmodel, Development者只需要implementation两个corefunction: Map and Reduce.

5.1 Mapfunction

Mapfunction 输入 is 键值 for , 输出 is in间键值 for . Mapfunction signaturesuch as under :

map(input_key, input_value) → list(intermediate_key, intermediate_value)

5.2 Reducefunction

Reducefunction 输入 is in间键 and 该键 for 应 所 has in间值 iterators, 输出 is 最终 键值 for . Reducefunction signaturesuch as under :

reduce(intermediate_key, list(intermediate_value)) → list(output_key, output_value)

5.3 WordCountexample

WordCount is MapReduce 经典example, 用于statistics文本in每个单词出现 次数. under 面 is WordCount Javaimplementation:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  // Mapclass
  public static class TokenizerMapper
       extends Mapper{ // Input: (Object, Text), Output: (Text, IntWritable)

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      // 将输入文本分割成单词
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        // 输出in间键值 for : (word, 1)
        context.write(word, one);
      }
    }
  }

  // Reduceclass
  public static class IntSumReducer
       extends Reducer { // Input: (Text, IntWritable), Output: (Text, IntWritable)
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable values, // values is 具 has 相同key 所 has value iterators
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      // 累加相同单词 计数
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      // 输出最终键值 for : (word, sum)
      context.write(key, result);
    }
  }

  // 主function
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    
    // 设置Mapclass and Reduceclass
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class); // 设置Combinerclass, 用于局部merge
    job.setReducerClass(IntSumReducer.class);
    
    // 设置输出键值 for  class型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    
    // 设置输入 and 输出path
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
    // submittingjob并etc.待completion
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

6. MapReducejob执行

要执行MapReducejob, 需要completion以 under 步骤:

6.1 编译code

$ javac -cp $HADOOP_HOME/share/hadoop/common/hadoop-common-3.3.1.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.1.jar WordCount.java

6.2 creationJARfile

$ jar cf wc.jar WordCount*.class

6.3 submittingjob

$ hadoop jar wc.jar WordCount /input /output

6.4 查看结果

$ hdfs dfs -cat /output/part-r-00000

7. MapReduceoptimizationtechniques

7.1 合理设置shard big small

shard big small 直接影响Maptask 数量. such as果shard太 small , 会导致Maptask数量过 many , 增加jobscheduling 开销; such as果shard太 big , 会导致单个Maptaskprocessing data量过 big , 影响job parallel度.

7.2 usingCombiner

Combiner可以 in Map阶段之 after for in间键值 for for局部汇总, reducingnetwork传输 data量, improvingsystemperformance.

7.3 合理设置Reducetask数量

Reducetask数量直接影响job parallel度 and 输出file 数量. such as果Reducetask数量太 few , 会导致单个Reducetaskprocessing data量过 big ; such as果Reducetask数量太 many , 会导致输出file数量过 many , 增加 after 续processing 开销.

7.4 data本地化

MapReduce会尽量将Maptask分配 to datawhere node on 执行, reducingnetwork传输 data量. Development者可以through合理design输入data 分布, improvingdata本地化 比例.

7.5 压缩in间data

压缩in间data可以reducingnetwork传输 data量, improvingsystemperformance. MapReducesupport many 种压缩格式, such asGzip, Snappyetc..

8. MapReduce Pros and Cons

8.1 优点

  • simple 易用: providing simple programmingmodel, Development者只需要关注业务逻辑
  • high reliability: throughfault tolerancemechanism, 确保 in nodefailure时taskable to继续执行
  • high scale性: 可以through添加node来improvingprocessingcapacity
  • high throughput: 适合processinglarge-scaledata集 批量processing
  • 成熟 stable : 经过 many 年 发展 and test, 已经成 for distributed计算 经典framework

8.2 缺点

  • 实时processingcapacity has 限: 传统MapReduce适合批processing, 实时processingcapacity相 for 较弱
  • latency较 high : job 启动 and scheduling需要一定 时间
  • 不适合iteration计算: for 于需要 many 次iteration algorithms, MapReduce performance较差
  • programmingmodel相 for simple : for 于 complex 计算task, 可能需要writing big 量 code

实践练习

练习1: WordCountexample

编译并runWordCountexample, statistics一个文本filein每个单词出现 次数.

练习2: 倒排index

implementation一个倒排index程序, statistics每个单词 in 哪些filein出现过, 以及出现 次数.

练习3: TopNissues

implementation一个TopN程序, 找出文本in出现次数最 many before N个单词.

练习4: MapReduceoptimization

for WordCountexampleforoptimization, usingCombiner, 压缩etc.techniquesimprovingjob 执行efficiency.

9. summarized

本tutorial深入介绍了MapReduce core concepts, architecturedesign, working principles and programmingmodel. MapReduce is adistributed计算model, 具 has simple 易用, high reliability, high scale性 and high throughputetc.特点, 适合processinglarge-scaledata集 批量processing.

MapReduce programmingmodel基于两个corefunction: Map and Reduce. Development者只需要implementation这两个function, 就可以writingdistributed计算程序. MapReduce workflow程includingjobsubmitting, job初始化, task分配, task执行, in间dataprocessing, 结果输出 and jobcompletionetc.步骤.

虽然MapReduce in 实时processing and iteration计算方面存 in 一定 局限性, 但它仍然 is distributed计算领域 经典framework, widely used in各个领域 big dataprocessingtask.