MapReduce主要接口有哪些

网友投稿 174 2024-02-02


MapReduce主要接口有哪些

本文主要介绍"MapReduce主要接口有哪些",希望能够解决您遇到有关问题,下面我们一起来看这篇 "MapReduce主要接口有哪些" 文章。

(1) InputFormat接口

用户需要实现该接口以指定输入文件的内容格式。该接口有两个方法

public interface InputFormat {

InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

     RecordReadergetRecordReader(InputSplit split,JobConf job,Reporter reporter) throws IOException;

}

其中getSplits函数将所有输入数据分成numSplits个split,每个split交给一个map task处理。getRecordReader函数提供一个用户解析split的迭代器对象,它将split中的每个record解析成key/value对。

Hadoop本身提供了一些InputFormat:

TextInputFormat

作为默认的文件输入格式,用于读取纯文本文件,文件被分为一系列以LF或者CR结束的行,key是每一行的位置偏移量,是LongWritable类型的,value是每一行的内容,为Text类型。

KeyValueTextInputFormat

同样用于读取文件,如果行被分隔符(缺省是tab)分割为两部分,第一部分为key,剩下的部分为value;如果没有分隔符,整行作为 key,value为空。

SequenceFileInputFormat

用于读取sequence file。 sequence file是Hadoop用于存储数据自定义格式的binary文件。它有两个子类:SequenceFileAsBinaryInputFormat,将 key和value以BytesWritable的类型读出;SequenceFileAsTextInputFormat,将key和value以Text类型读出。

SequenceFileInputFilter

根据filter从sequence文件中取得部分满足条件的数据,通过 setFilterClass指定Filter,内置了三种 Filter,RegexFilter取key值满足指定的正则表达式的记录;PercentFilter通过指定参数f,取记录行数%f==0的记录;MD5Filter通过指定参数f,取MD5(key)%f==0的记录。

NLineInputFormat

可以将文件以行为单位进行split,比如文件的每一行对应一个map。得到的key是每一行的位置偏移量(LongWritable类型),value是每一行的内容,Text类型。

MultipleInputs

用于多个数据源的join

(2)Mapper接口用户需继承Mapper接口实现自己的Mapper,Mapper中必须实现的函数是

Mapper有setup(),map(),cleanup()和run()四个方法。其中setup()一般是用来进行一些map()前的准备工作,map()则一般承担主要的处理工作,cleanup()则是收尾工作如关闭文件或者执行map()后的K-V分发等。run()方法提供了setup->map->cleanup()的执行模板。

(3)Partitioner接口

用户需继承该接口实现自己的Partitioner以指定map task产生的key/value对交给哪个reduce task处理,好的Partitioner能让每个reduce task处理的数据相近,从而达到负载均衡。Partitioner中需实现的函数是

getPartition(  K2   key, V2 value, int numPartitions)

该函数返回对应的reduce task ID。

用户如果不提供Partitioner,Hadoop会使用默认的(实际上是个hash函数)。

Partitioner如何使用

•实现Partitioner接口覆盖getPartition()方法

•Partitioner示例

public static class MyPartitioner extends Partitioner {

         @Override

            public int getPartition(Text key, Text value, int numPartitions) {

             }

}

Partitioner需求示例

•需求描述

•数据文件中含有省份

•需要相同的省份送到相同的Reduce里

•从而产生不同的文件

•步骤

•实现Partitioner,覆盖getPartition

•根据省份字段进行切分

(4)Combiner

combine函数把一个map函数产生的<key,value>对(多个key, value)合并成一个新的<key2,value2>. 将新的<key2,value2>作为输入到reduce函数中,其格式与reduce函数相同。Combiner使得map task与reduce task之间的数据传输量大大减小,可明显提高性能。大多数情况下,Combiner与Reducer相同。

什么情况下可以使用Combiner

•可以对记录进行汇总统计的场景,如求和。

•求平均数的场景就不可以使用了

Combiner执行时机

•运行combiner函数的时机有可能会是merge完成之前,或者之后,这个时机可以由一个参数控制,即 min.num.spill.for.combine(default 3)

•当job中设定了combiner,并且spill数最少有3个的时候,那么combiner函数就会在merge产生结果文件之前运行

•通过这样的方式,就可以在spill非常多需要merge,并且很多数据需要做conbine的时候,减少写入到磁盘文件的数据数量,同样是为了减少对磁盘的读写频率,有可能达到优化作业的目的。

•Combiner也有可能不执行, Combiner会考虑当时集群的负载情况。

Combiner如何使用

•继承Reducer类

public static class Combiner extends Reducer {

       public void reduce(Text key, Iterator values,

               OutputCollectoroutput, Reporter reporter)

               throws IOException {

                 }

    }

(5)Reducer接口

实现自己的Reducer,必须实现reduce函数

(6)OutputFormat用户通过OutputFormat指定输出文件的内容格式,不过它没有split。每个reduce task将其数据写入自己的文件,文件名为part-nnnnn,其中nnnnn为reduce task的ID。

public abstract class OutputFormat {  

  /** 

   * 创建一个记录写入器

   */ 

  public abstract RecordWritergetRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException;

  /** 

   * 检查结果输出的存储空间是否有效

   */ 

public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException;

  /**

   * 创建一个任务提交器

   */ 

public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException;

}

编程技术 和 程序设计

本文主要介绍"查询设置Fetch task来不启用Mapreduce job的方式有哪些",希望能够解决您遇到有关问题,下面我们一起来看这篇 "查询设置Fetch ...


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Mybatis中Mapper接口有什么用
下一篇:PHP接口多继承及tarits实现多继承效果的方法
相关文章

 发表评论

暂时没有评论,来抢沙发吧~