MapTask阶段shuffle源码分析

网友投稿 198 2023-01-15


MapTask阶段shuffle源码分析

1. 收集阶段

在Mapper中,调用context.write(key,value)实际是调用代理NewOutPutCollector的wirte方法

public void write(KEYOUT key, VALUEOUT value

) throws IOException, InterruptedException {

output.write(key, value);

}

实际调用的是MapOutPutBuffer的collect(),在进行收集前,调用partitioner来计算每个key-value的分区号

@Override

public void write(K key, V value) throws IOException, InterruptedException {

collector.collect(key, value,

partitioner.getPartition(key, value, partitions));

}

2. NewOutPutCollector对象的创建

@SuppressWarnings("unchecked")

NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,

JobConf job,

TaskUmbilicalProtocol umbilical,

TaskReporter reporter

) throws IOException, ClassNotFoundException {

// 创建实际用来收集key-value的缓存区对象

collector = createSortingCollector(job, reporter);

// 获取总的分区个数

partitions = jobContext.getNumReduceTasks();

if (partitions > 1) {

partitioner = (org.apache.hadoop.mapreduce.Partitioner)

ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);

} else {

// 默认情况,直接创建一个匿名内部类,所有的key-value都分配到0号分区

partitioner = new org.apache.hadoop.mapreduce.Partitioner() {

@Override

public int getPartition(K key, V value, int numPartitions) {

return partitions - 1;

}

};

}

}

3. 创建环形缓冲区对象

@SuppressWarnings("unchecked")

private MapOutputCollector

createSortingCollector(JobConf job, TaskReporter reporter)

throws IOException, ClassNotFoundException {

MapOutputCollector.Context context =

new MapOutputCollector.Context(this, job, reporter);

// 从当前Job的配置中,获取mapreduce.job.map.output.collector.class,如果没有设置,使用MapOutputBuffer.class

Class>[] collectorClasses = job.getClasses(

JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);

int remainingCollectors = collectorClasses.length;

Exception lastException = null;

for (Class clazz : collectorClasses) {

try {

if (!MapOutputCollector.class.isAssignableFrom(clazz)) {

throw new IOException("Invalid output collector class: " + clazz.getName() +

" (does not implement MapOutputCollector)");

}

Class extends MapOutputCollector> subclazz =

clazz.asSubclass(MapOutputCollector.class);

LOG.debug("Trying map output collector class: " + subclazz.getName());

// 创建缓冲区对象

MapOutputCollector collector =

ReflectionUtils.newInstance(subclazz, job);

// 创建完缓冲区对象后,执行初始化

collector.init(context);

LOG.info("Map output collector class = " + collector.getClass().getName());

return collector;

} catch (Exception e) {

String msg = "Unable to initialize MapOutputCollector " + clazz.getName();

if (--remainingCollectors > 0) {

msg += " (" + remainingCollectors + " more collector(s) to try)";

}

lastException = e;

LOG.warn(msg, e);

}

}

throw new IOException("Initialization of all the collectors failed. " +

"Error in last collector was :" + lastException.getMessage(), lastException);

}

3. MapOutPutBuffer的初始化 &nbspmfNuug; 环形缓冲区对象

@SuppressWarnings("unchecked")

public void init(MapOutputCollector.Context context

) throws IOException, ClassNotFoundException {

job = context.getJobConf();

reporter = context.getReporter();

mapTask = context.getMapTask();

mapOutputFile = mapTask.getMapOutputFile();

sortPhase = mapTask.getSortPhase();

spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);

// 获取分区总个数,取决于ReduceTask的数量

partitions = job.getNumReduceTasks();

rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();

//sanity checks

// 从当前配置中,获取mapreduce.map.sort.spill.percent,如果没有设置,就是0.8

final float spillper =

job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);

// 获取mapreduce.task.io.sort.mb,如果没设置,就是100MB

final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);

indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,

INDEX_CACHE_MEMORY_LIMIT_DEFAULT);

if (spillper > (float)1.0 || spillper <= (float)0.0) {

throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +

"\": " + spillper);

}

if ((sortmb & 0x7FF) != sortmb) {

throw new IOException(

"Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);

}

// 在溢写前,对key-value排序,采用的排序器,使用快速排序,只排索引

sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",

QuickSort.class, IndexedSorter.class), job);

// buffers and accounting

int maxMemUsage = sortmb << 20;

maxMemUsage -= maxMemUsage % METASIZE;

// 存放key-value

kvbuffer = new byte[maxMemUsage];

bufvoid = kvbuffer.length;

// 存储key-value的属性信息,分区号,索引等

kvmeta = ByteBuffer.wrap(kvbuffer)

.order(ByteOrder.nativeOrder())

.asIntBuffer();mfNuug

setEquator(0);

bufstart = bufend = bufindex = equator;

kvstart = kvend = kvindex;

maxRec = kvmeta.capacity() / NMETA;

softLimit = (int)(kvbuffer.length * spillper);

bufferRemaining = softLimit;

LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);

LOG.info("soft limit at " + softLimit);

LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);

LOG.info("kvstart = " + kvstart + "; length = " + maxRec);

// k/v serialization

// 获取快速排序的Key的比较器,排序只按照key进行排序!

comparator = job.getOutputKeyComparator();

// 获取key-value的序列化器

keyClass = (Class)job.getMapOutputKeyClass();

valClass = (Class)job.getMapOutputValueClass();

serializationFactory = new SerializationFactory(job);

keySerializer = serializationFactory.getSerializer(keyClass);

keySerializer.open(bb);

valSerializer = serializationFactory.getSerializer(valClass);

valSerializer.open(bb);

// output counters

mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);

mapOutputRecordCounter =

reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);

fileOutputmfNuugByteCounter = reporter

.getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);

// 溢写到磁盘,可以使用一个压缩格式! 获取指定的压缩编解码器

// compression

if (job.getCompressMapOutput()) {

Class extends CompressionCodec> codecClass =

job.getMapOutputCompressorClass(DefaultCodec.class);

codec = ReflectionUtils.newInstance(codecClass, job);

} else {

codec = null;

}

// 获取Combiner组件

// combiner

final Counters.Counter combineInputCounter =

reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);

combinerRunner = CombinerRunner.create(job, getTaskID(),

combineInputCounter,

reporter, null);

if (combinerRunner != null) {

final Counters.Counter combineOutputCounter =

reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);

combineCollector= new CombineOutputCollector(combineOutputCounter, reporter, job);

} else {

combineCollector = null;

}

spillInProgress = false;

minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);

// 设置溢写线程在后台运行,溢写是在后台运行另外一个溢写线程!和收集是两个线程!

spillThread.setDaemon(true);

spillThread.setName("SpillThread");

spillLock.lock();

try {

// 启动线程

spillThread.start();

while (!spillThreadRunning) {

spillDone.await();

}

} catch (InterruptedException e) {

throw new IOException("Spill thread failed to initialize", e);

} finally {

spillLock.unlock();

}

if (sortSpillException != null) {

throw new IOException("Spill thread failed to initialize",

sortSpillException);

}

}

4. Paritionner的获取

从配置中读取mapreduce.job.partitioner.class,如果没有指定,采用HashPartitioner.class

如果reduceTask > 1, 还没有设置分区组件,使用HashPartitioner

@SuppressWarnings("unchecked")

public Class extends Partitioner,?>> getPartitionerClass()

throws ClassNotFoundException {

return (Class extends Partitioner,?>>)

conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);

}

public class HashPartitioner extends Partitioner {

/** Use {@link Object#hashCode()} to partition. **/

public int getPartition(K key, V value,

int numReduceTasks) {

return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

}

}

分区号的限制:0 <= 分区号 < 总的分区数(reduceTask的个数)

if (partition < 0 || partition >= partitions) {

throw new IOException("Illegal partition for " + key + " (" +

partition + ")");

}

5.MapTask shuffle的流程

①在map()调用context.write()

②调用MapoutPutBuffer的collect()

调用分区组件Partitionner计算当前这组key-value的分区号

③将当前key-value收集到MapOutPutBuffer中

如果超过溢写的阀值,在后台启动溢写线程,来进行溢写!

④溢写前,先根据分区号,将相同分区号的key-value,采用快速排序算法,进行排序!

排序并不在内存中移动key-value,而是记录排序后key-value的有序索引!

⑤ 开始溢写,按照排序后有序的索引,将文件写入到一个临时的溢写文件中

如果没有定义Combiner,直接溢写!

                            如果定义了Combiner,使用CombinerRunner.conbine()对key-value处理后再次溢写!

⑥多次溢写后,每次溢写都会产生一个临时文件

⑦最后,执行一次flush(),将剩余的key-value进行溢写

⑧MergeParts: 将多次溢写的结果,保存为一个总的文件!

在合并为一个总的文件前,会执行归并排序,保证合并后的文件,各个分区也是有序的!

                     如果定义了Conbiner,Conbiner会再次运行(前提是溢写的文件个数大于3)!

                     否则,就直接溢写!

⑨最终保证生成一个最终的文件,这个文件根据总区号,分为若干部分,每个部分的key-value都已经排好序,等待ReduceTask来拷贝相应分区的数据

6. Combiner

combiner其实就是Reducer类型:

Class extends Reducer> cls =

(Class extends Reducer>) job.getCombinerClass();

Combiner的运行时机:

MapTask:

①每次溢写前,如果指定了Combiner,会运行

              ②将多个溢写片段,进行合并为一个最终的文件时,也会运行Combiner,前提是片段数>=3

ReduceTask:

③reduceTask在运行时,需要启动shuffle进程拷贝MapTask产生的数据!

数据在copy后,进入shuffle工作的内存,在内存中进行merge和sort!

                     数据过多,内部不够,将部分数据溢写在磁盘!

                     如果有溢写的过程,那么combiner会再次运行!

①一定会运行,②,③需要条件!

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对我们的支持。如果你想了解更多相关内容请查看下面相关链接


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:java线程池使用后到底要关闭吗
下一篇:springboot与redis的简单整合实例
相关文章

 发表评论

暂时没有评论,来抢沙发吧~