MapReduce

发布于 2022-05-04  97 次阅读


Hadoop I/O 流操作

数据完整性

1. CRC-32校验和简介

HDFS会对所有写入数据计算校验和,并在读取数据时再次计算校验和进行验证。
针对由每个 io.bytes.per.checksum指定字节的数据计算校验和。默认情况下为512B,由于CRC-32校验和是4B,所以存储校验和的额外开销低于1% 。

2. 写入数据及其校验和

Datanode负责在验证收到的数据后存储数据及其校验和。
它在收到客户端的数据或复制期间其他datanode的数据时执行这个操作。

  1. 正在写数据的客户端将数据及其校验和发送到由一系列datanode组成的管线,管线中最后一个datanode负责验证校验和。
  2. 如果datanode监测到错误,客户端便会收到一个CheckSumException异常。

3. 读取数据时的校验

客户端从datanode中读取数据时,也会验证校验和,将他们与datanode中存储的校验和进行比较。
每个datanode均持久保存有一个用于验证的校验和日志,所以它知道每个数据块的最后一次验证时间。
客户端成功验证一个数据块后,会告诉这个datanode,datanode由此更新日志。

4. 数据自动检验

每个datanode也会有一个后台线程中运行一个DataBlockScanner,定期验证存储在这个datanode上的所有数据块。

5. 数据副本修复

HDFS存储着每个数据块的副本,可通过复制完好的数据副本来修复损坏的数据块,进而得到一个新的、完好无损的副本。

  1. 客户端在读取数据块时,如果监测到错误,就向namenode报告已损坏的数据块及其正在尝试操作的这个datanode,最后才抛出CheckSumException异常。
  2. Namenode将这个已损坏的数据块的副本标记为已损坏,之后它安排这个数据块的一个副本复制到另一个datanode,这样数据块的副本因子又回到期望水平。
  3. 将已损坏的数据块副本便被删除

6. 不使用校验和的方式

在FileSystem的open()之前通过设置FileSystem的setVerifyCheckSum(false)方法禁用校验和,或者命令行使用get时候添加选项-ignoreCrc或者直接使用-copyToLocal

//jave
fs.setVerifyChecksum(false)

//shell
Hadoop fs –get –ignoreCrc hdfs://master:9000/

序列化与反序列化

序列化(Serialization)是指将结构化对象转化为字节流(或其他数据传输协议),以便在网络上传输或写到磁盘进行永久存储。

反序列化(deserialization)是指将字节流转回结构化对象的逆过程

1. 序列化的分布式应用

序列化在分布式数据处理的两大领域经常出现。

  • 进程间通信
  • 永久存储

Hadoop中,系统中多个节点上进程间的通信是通过“远程过程调用” (Remote Procedure Call) RPC实现的。
RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化为原始消息。

数据永久存储所期望的4个RPC序列化属性非常重要:

  1. 紧凑。高效实用存储空间
  2. 快速。读写数据的额外开销比较小
  3. 可扩展。可以透明的读取老格式的数据
  4. 互操作。可以使用不同的语言读写永久存储的数据

2. Hadoop序列化

1.Hadoop使用自己的序列化格式 Writable 。
2.Writable是Hadoop的核心。它紧凑、速度快,但很难用Java以外的语言进行扩展和使用。
3.序列化框架 Avro ,它不受具体语言限制,克服了 Writable 的局限性。

3. Hadoop序列化实现

Writable 是Hadoop的序列化格式,hadoop定义了这样⼀个Writable接⼝。

public interface Writable() {
    //序列化
    void write(DataOutput out) throws IOException;
    //反序列化
    void readFields(DataInput in) throws IOException;
}

4. Hadoop常用数据序列化类型

java类型 hadoop Writable类型
Boolen BoolenWritable
Byte ByteWritable
Int IntWritable
Float FloatWritable
Long LongWritable
Double DoubleWritable
String Text
Map Mapwritable
Array ArrayWritable
Null NullWritable

基于文件的数据结构 SequenceFile

日志文件中的每一条日志记录是一行文本。如果想将一条条记录转化为二进制类型,纯文本存储操作是不合适的。这种情况下,Hadoop的SequenceFile类非常合适,因为上述类提供了二进制的键/值对的永久存储的数据结构。
当作为日志文件的存储格式时,你可以自己选择键,比如由LongWritable类型表示的时间戳,以及值可以是Writable类型,用于表示日志记录的数量。

SequenceFiles同样也可以作为小文件的容器。而HDFS和MapReduce是针对大文件进行优化的,所以通过SequenceFile类型将小文件包装起来,可以获得更高效率的存储和处理。

Mapreduce

Mapreduce编程框架是一种可用于数据处理的编程模型
Hadoop平台可以运行由各种计算机语言编写的MapReduce程序。
MapReduce程序本质上是并行运行的,因此可以将大规模的数据分析任务交给任何一个拥有强大计算能力的运行商。

1. MapReduce编程模型简介

从MapReduce自身的命名特点可以看出,MapReduce由两个阶段组成:Map和Reduce。用户只需 map() 和 reduce() 两个函数,即可完成简单的分布式程序设计。

map()函数以key/value对作为输入,产生另外一系列 key/value对 作为中间输出写入本地磁盘。MapReduce框架会自动将这些中间数据按照key值进行聚合,且key值相同的数据被统一交给reduce()函数处理。

reduce()函数以key及对应的value列表作为输入,经合并key相同的value值后,产生另外一系列key/value对作为最终输出写入HDFS。

2. MapReduce 的优缺点

优点

  1. MapReduce 易于编程它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量 廉价的 PC 机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得 MapReduce 编程变得非常流行。
  2. 良好的扩展性。当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力
  3. 高容错性。MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求它具有很高的容错性。其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行, 不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由 Hadoop 内部完成的。
  4. 适合 PB 级以上海量数据的离线处理可以实现上千台服务器集群并发工作,提供数据处理能力

缺点

  1. 不擅长实时计算。MapReduce 返回结果的时间较长。
  2. 不擅长流式计算。MapReduce 的只能处理离线的静态数据。
  3. 不擅长大量的迭代计算。需要将中间计算结果写入磁盘。

3. MapReduce工作流程

一个完整的 MapReduce 程序在分布式运行时有三类实例进程:

  1. Mapreduce Application Master:负责整个程序的过程调度及状态协调。
  2. MapTask:负责 Map 阶段的整个数据处理流程。
  3. ReduceTask:负责 Reduce 阶段的整个数据处理流程。

MapReduce分布式的运算程序往往需要分成至少 2 个阶段。

  1. 第一个阶段的 MapTask 并发实例,完全并行运行,互不相干。
  2. 第二个阶段的 ReduceTask 并发实例互不相干,但是他们的数据依赖于上一个阶段的所有 MapTask 并发实例的输出。

MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段,如果用户的业务逻辑非常复杂,那就只能多个 MapReduce 程序,串行运行。

4. MapReduce 编程

1. Mapper

  1. 用户自定义的Map类要继承Mapper父类
  2. Map的输入数据是KV对的形式(KV的类型可自定义)
  3. Map中的业务逻辑写在map()方法中
  4. Map的输出数据是KV对的形式(KV的类型可自定义)
  5. map()方法(MapTask进程)对每一个\<K,V>调用一次

public class MyMapper extends Mapper <KEYIN, VALUEIN, KEYOUT, VALUEOUT>

MapReduce 有默认的读取数据组件: TextInputFormat

读数据的方式:一行一行读取数据,返回 \<key, value> 键值对

  • KEYIN
    • map阶段输入 \<key, value> 中 key 的类
    • 默认组件下,是起始位置的偏移量,因此是 LongWritable
  • VALUEIN
    • map阶段输入 \<key, value> 中 value 的类型
    • 默认组件下,是每一行的内容,因此是 Text
  • KEYOUT
    • map阶段输出 \<key, value> 中 key 的类
    • 与需求类型相关,如 WordCount 需求输出单词,因此是Text
  • VALUEOUT
    • map阶段输出 \<key, value> 中 value 的类型
    • 与需求类型相关,如 WordCount 需求输出单词次数,因此是IntWritable
/*
* map 方法是 Map 阶段的核心方法,也是具体需求逻辑实现的方法
* 该方法被调用次数和输入的 <k, v> 有关,每当 TextInputFormat 读取返回一个<k, v>,就调用一次 map 方法进行处理
* 默认情况下,map 基于行处理数据
*/
/*
* 以 WordCount 为例
* 源文件为:
* hello word
* hello hadoop
* 处理第一行,输出<hello, 1>, <word, 1>
* 处理第二行,输出<hello, 1>, <hadoop, 1>
*/

public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    private Text k = new Text();
    private IntWritable v =new IntWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Context context) throws InterruptedException, IOException {
        //对每一行进行处理,转化为String
        String line =value.toString();
        //根据分隔符进行切割
        String[] words=line.split(" ");
        for (String word : words) {
            //输出数据
            k.set(word);
            context.write(k,v);
        }
    }
}

2. Reducer

  1. 用户自定义的Reducer要继承自己的父类
  2. Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
  3. Reducer的业务逻辑写在reduce()方法中
  4. ReduceTask进程对每一组相同k的\<k,v>组调用一次reduce()方法

public class MyReducer extends Reducer <KEYIN, VALUEIN, KEYOUT, VALUEOUT>

  • KEYIN
    • map阶段输出 \<key, value> 中 key 的类
  • VALUEIN
    • map阶段输出 \<key, value> 中 value 的类型
  • KEYOUT
    • Reduce阶段输出 \<key, value> 中 key 的类
    • 与需求类型相关,如 WordCount 需求输出单词,因此是Text
  • VALUEOUT
    • Reduce阶段输出 \<key, value> 中 value 的类型
    • 与需求类型相关,如 WordCount 需求输出单词次数,因此是IntWritable
/*
* 当 Map 的所有输出数据来到 Reduce 后,该如何调用Reduce方法处理?
* Map阶段返回的键值对为<hello, 1>, <word, 1>, <hello, 1>, <hadoop, 1>
* 1. 排序(默认以key的字典序)
*   <hadoop, 1>, <hello, 1>, <hello, 1>, <word, 1>
* 2. 分组(key相同的分为一组)
*   <hadoop, 1>
*   <hello, 1>, <hello, 1>
*   <word, 1>
* 3. 分组之后形成新的键值对,调用一次reduce
*   <hadoop, [1]>
*   <hello, [1, 1]>
*   <word, [1]>
*/
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    int sum;
    IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws InterruptedException, IOException {
        //求和
        sum = 0;
        for (IntWritable count : values) {
            sum += count.get();
        }
        //输出
        v.set(sum);
        context.write(key, v);
    }
}

3. Driver

该类是 mapreduce 程序客户端驱动类 主要是构建 Job 对象实例
指定各种组件属性

public class WordCountDriver {
    public static void main(String[] args) throws ClassNotFoundException, InterruptedException, IOException {
        //配置文件对象
        Configuration conf = new Configuration();
        //跨平台设置(百度)
        conf.set("mapreduce.app-submission.cross-platform","true");
        //设置作业实例
        Job job = Job.getInstance(conf,WordCountDriver.class.getSimpleName());

        //设置job运行时的程序入口主类
        job.setJarByClass(WordCountDriver.class);
        job.setJar("E:\\代码\\Java\\demo\\target\\HDFSclient-1.0-SNAPSHOT.jar");

        //设置map函数和reduce函数的实现类对象
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //通过 job 设置输入/输出格式为文本格式(我们目前操作的基本类型都是都是文本类型)
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        //设置 map 阶段输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //设置 reduce 阶段输出的kv类型,也就是最终输出的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //输入输出路径
        FileInputFormat.addInputPath(job, new Path("/map/input/a"));
        FileOutputFormat.setOutputPath(job, new Path("/map/output"));

        //提交作业并等待执行完成
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

Hadoop MapReduce 架构

MapReduce架构核心组件

1. Client客户端

用户编写的MapReduce程序通过Client提交到JobTracker端;同时,用户可通过Client提供的一些接口查看作业的运行状态。在Hadoop内部用“作业”(Job)表示MapReduce程序。一个MapReduce程序可对应若干个作业,而每个作业会被分解成若干个Map/Reduce任务(Task)。

2. JobTracker

JobTracke负责资源监控和作业调度。JobTracker 监控所有TaskTracker 与job的健康状况,一旦发现失败,就将相应的任务转移到其他节点;同时,JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在资源出现空闲时,选择合适的任务使用这些资源。在Hadoop 中,任务调度器是一个可插拔的模块,用户可以根据自己的需要设计相应的调度器。

3. TaskTracker

TaskTracker 会周期性地通过Heartbeat 将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)。TaskTracker 使用“slot”等量划分本节点上的资源量。“slot”代表计算资源(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,而Hadoop 调度器的作用就是将各个TaskTracker 上的空闲slot 分配给Task 使用。slot 分为Map slot 和Reduce slot 两种,分别供MapTask 和Reduce Task 使用。TaskTracker 通过slot 数目(可配置参数)限定Task 的并发度。

4. Task

Task 分为Map Task 和Reduce Task 两种,均由TaskTracker 启动。HDFS 以固定大小的block 为基本单位存储数据,而对于MapReduce 而言,其处理单位是split。split 是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。但需要注意的是,split 的多少决定了Map Task 的数目,因为每个split 只会交给一个Map Task 处理。

Map Task

Map Task 执行过程如下图所示。由该图可知,Map Task 先将对应的split 迭代解析成一个个 key/value 对,依次调用用户自定义的 map() 函数进行处理,最终将临时结果存放到本地磁盘上,其中临时数据被分成若干个partition,每个partition 将被一个Reduce Task 处理。

Reduce Task

Reduce Task 执行过程下图所示。该过程分为三个阶段:

  1. 从远程节点上读取MapTask 中间结果(称为“Shuffle 阶段”);
  2. 按照key 对key/value 对进行排序(称为“Sort 阶段”);
  3. 依次读取\<key, value list>,调用用户自定义的reduce() 函数处理,并将最终结果存到HDFS 上(称为“Reduce 阶段”)。

编程实战

1. 自定义对象,实现序列化接口(Writable)

  1. 必须实现Writable接口
  2. 反序列化时,需要反射调用空参构造函数,所以必须有空参构造
  3. 重写序列化方法
  4. 重写反序列化方法
  5. 注意反序列化的顺序和序列化的顺序完全一致
  6. 要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。
  7. 如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduceShuffle过程要求对key必须能排序

Writable 有⼀个⼦接⼝是 WritableComparable, WritableComparable 是既可实现序列化, 也可以对key进⾏⽐较, 我们这⾥可以通
过⾃定义 Key 实现 WritableComparable 来实现我们的排序功能。

public class NewKey implements WritableComparable<NewKey> {
    //两个参数
    int first, second;

    //无参构造方法
    public NewKey(){

    }
    //有参构造方法
    public NewKey(int first, int second){
        this.first = first;
        this.second = second;
    }

    //序列化
    @override
    public void write(DataOutput out) throws IOException {
        out.writeInt(first);
        out.wrtieInt(second);
    }

    //反序列化,反序列化的顺序和序列化的顺序完全一致
    @override
    public void readFields(DataInput in) throws IOException {
        this.first = in.readInt()
        this.second = in.readInt()
    }

    //重写toString
    @override
    public String toString() {
        return first + " " + second;
    }

    //重写比较逻辑CompareTo()
    public int CompareTo(NewKey x){
        int m = this.first - x.first;
        if(m != 0) return m;
        else return this.second - x.second;
    }
}