MR 中的 Reduce 端源码分析

分享 AIZero ⋅ 于 2020-10-10 12:46:36 ⋅ 88 阅读
Reduce阶段的核心
  • Copy和Merge阶段:Copy阶段和Merge阶段基本同时进行的
  • Reduce阶段:返回伪迭代器,执行Reduce函数输出数据到HDFS


核心面板:ReduceTask
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, InterruptedException, ClassNotFoundException {
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

    if (isMapOrReduce()) {
      copyPhase = getProgress().addPhase("copy");
      sortPhase  = getProgress().addPhase("sort");
      reducePhase = getProgress().addPhase("reduce");
    }
    // start thread that will handle communication with parent
    TaskReporter reporter = startReporter(umbilical);

    boolean useNewApi = job.getUseNewReducer();
    // 初始化jobContext,taskContext,设置输出目录
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter);
      return;
    }

    // Initialize the codec
    // 控制map端输出的压缩设置
    codec = initCodec();
    RawKeyValueIterator rIter = null;
    ShuffleConsumerPlugin shuffleConsumerPlugin = null;

    Class combinerClass = conf.getCombinerClass();
    CombineOutputCollector combineCollector = 
      (null != combinerClass) ? 
     new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;

    Class<? extends ShuffleConsumerPlugin> clazz =
          job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);

    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
    LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);

    ShuffleConsumerPlugin.Context shuffleContext = 
      new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
                  super.lDirAlloc, reporter, codec, 
                  combinerClass, combineCollector, 
                  spilledRecordsCounter, reduceCombineInputCounter,
                  shuffledMapsCounter,
                  reduceShuffleBytes, failedShuffleCounter,
                  mergedMapOutputsCounter,
                  taskStatus, copyPhase, sortPhase, this,
                  mapOutputFile, localMapFiles);
    // 创建shuffle所需要的组件:scheduler,merger 【1入口】
    shuffleConsumerPlugin.init(shuffleContext);
    // copy阶段的核心 【2入口】
    rIter = shuffleConsumerPlugin.run();

    // free up the data structures
    mapOutputFilesOnDisk.clear();

    sortPhase.complete();                         // sort is complete
    setPhase(TaskStatus.Phase.REDUCE); 
    statusUpdate(umbilical);
    Class keyClass = job.getMapOutputKeyClass();
    Class valueClass = job.getMapOutputValueClass();
    // 插曲:设置分组比较器,也包含了类比较器
    RawComparator comparator = job.getOutputValueGroupingComparator();

    if (useNewApi) {
      // reduce的核心【3入口】
      runNewReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    } else {
      runOldReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    }

    shuffleConsumerPlugin.close();
    done(umbilical, reporter);
  }
copy和merge的归纳

首先ReduceTask会率先开启Merge的三个线程,其中内存到内存默认是不开启的。Fetcher线程会向每个Map拉取数据,在内存中每个Map对应一块数据,放入reduce任务节点的内存之中。到达限值之后启动内存到磁盘的merger线程,这里的Combiner的设置会生效。当所有的map输出全部拉取完成,在reduce端的内存中的数据和硬盘中的数据同时共存,需要进行最后一次Merger,先进行内存到硬盘的merge,然后进行硬盘到硬盘的merge。

Reduce的内存缓冲区可通过mapred.job.shuffle.input.buffer.percent配置,默认是JVM的heap size的70%。内存到磁盘merge的启动门限可以通过mapred.job.shuffle.merge.percent配置,默认是66%。Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。最终Reduce shuffle过程会输出一个整体有序的数据块。


入口1:Merger三大线程率先开启

1.shuffleConsumerPlugin.init(shuffleContext);

public void init(ShuffleConsumerPlugin.Context context) {
    this.context = context;

    this.reduceId = context.getReduceId();
    this.jobConf = context.getJobConf();
    this.umbilical = context.getUmbilical();
    this.reporter = context.getReporter();
    this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
    this.copyPhase = context.getCopyPhase();
    this.taskStatus = context.getStatus();
    this.reduceTask = context.getReduceTask();
    this.localMapFiles = context.getLocalMapFiles();
    // 创建scheduler组件
    scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
        this, copyPhase, context.getShuffledMapsCounter(),
        context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
    // 创建merger组件
    merger = createMergeManager(context);
  }

1.1 scheduler调度器:负责从MapTask搬运数据

public ShuffleSchedulerImpl(JobConf job, TaskStatus status,
                          TaskAttemptID reduceId,
                          ExceptionReporter reporter,
                          Progress progress,
                          Counters.Counter shuffledMapsCounter,
                          Counters.Counter reduceShuffleBytes,
                          Counters.Counter failedShuffleCounter) {
    totalMaps = job.getNumMapTasks();
    abortFailureLimit = Math.max(30, totalMaps / 10);
    copyTimeTracker = new CopyTimeTracker();
    remainingMaps = totalMaps;
    finishedMaps = new boolean[remainingMaps];
    this.reporter = reporter;
    this.status = status;
    this.reduceId = reduceId;
    this.progress = progress;
    this.shuffledMapsCounter = shuffledMapsCounter;
    this.reduceShuffleBytes = reduceShuffleBytes;
    this.failedShuffleCounter = failedShuffleCounter;
    this.startTime = Time.monotonicNow();
    lastProgressTime = startTime;
    referee.start();
    this.maxFailedUniqueFetches = Math.min(totalMaps, 5);
    this.maxFetchFailuresBeforeReporting = job.getInt(
        MRJobConfig.SHUFFLE_FETCH_FAILURES, REPORT_FAILURE_LIMIT);
    this.reportReadErrorImmediately = job.getBoolean(
        MRJobConfig.SHUFFLE_NOTIFY_READERROR, true);

    this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
        MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
    this.maxHostFailures = job.getInt(
        MRJobConfig.MAX_SHUFFLE_FETCH_HOST_FAILURES,
        MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES);
  }

1.2 merger:负责将拉取过来的数据进行合并

protected MergeManager<K, V> createMergeManager(
      ShuffleConsumerPlugin.Context context) {
    return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(),
        context.getLocalDirAllocator(), reporter, context.getCodec(),
        context.getCombinerClass(), context.getCombineCollector(), 
        context.getSpilledRecordsCounter(),
        context.getReduceCombineInputCounter(),
        context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
        context.getMapOutputFile());
  }

1.2.1接下来看返回值MergeManagerImpl的构造函数

public MergeManagerImpl(TaskAttemptID reduceId, JobConf jobConf, 
                      FileSystem localFS,
                      LocalDirAllocator localDirAllocator,  
                      Reporter reporter,
                      CompressionCodec codec,
                      Class<? extends Reducer> combinerClass,
                      CombineOutputCollector<K,V> combineCollector,
                      Counters.Counter spilledRecordsCounter,
                      Counters.Counter reduceCombineInputCounter,
                      Counters.Counter mergedMapOutputsCounter,
                      ExceptionReporter exceptionReporter,
                      Progress mergePhase, MapOutputFile mapOutputFile) {
    ...

    // merger的三大线程:memToMemMerger,inMemoryMerger,onDiskMerger
    // 代表着三种形式,内存到内存(默认不开启),内存到硬盘,硬盘到硬盘
    boolean allowMemToMemMerge = 
      jobConf.getBoolean(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, false);
    if (allowMemToMemMerge) {
      this.memToMemMerger = 
        new IntermediateMemoryToMemoryMerger(this,
                                             memToMemMergeOutputsThreshold);
      this.memToMemMerger.start();
    } else {
      this.memToMemMerger = null;
    }

    this.inMemoryMerger = createInMemoryMerger();
    this.inMemoryMerger.start();

    this.onDiskMerger = new OnDiskMerger(this);
    this.onDiskMerger.start();

    this.mergePhase = mergePhase;
  }

1.2.1.1 merge的三大线程都继承父类MergeThread,查看其run方法

 public void run() {
    while (true) {
      List<T> inputs = null;
      try {
        // Wait for notification to start the merge...
        synchronized (pendingToBeMerged) {
          while(pendingToBeMerged.size() <= 0) {
            pendingToBeMerged.wait();
          }
          // Pickup the inputs to merge.
          inputs = pendingToBeMerged.removeFirst();
        }

        // Merge
        merge(inputs);
      } catch (InterruptedException ie) {
        numPending.set(0);
        return;
      } catch(Throwable t) {
        numPending.set(0);
        reporter.reportException(t);
        return;
      } finally {
        synchronized (this) {
          numPending.decrementAndGet();
          notifyAll();
        }
      }
    }
  }

尤其注意的是inMemoryMerger里面包含了对combiner的执行,其他两个线程则没有

public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
      ...

        rIter = Merger.merge(jobConf, rfs,
                             (Class<K>)jobConf.getMapOutputKeyClass(),
                             (Class<V>)jobConf.getMapOutputValueClass(),
                             inMemorySegments, inMemorySegments.size(),
                             new Path(reduceId.toString()),
                             (RawComparator<K>)jobConf.getOutputKeyComparator(),
                             reporter, spilledRecordsCounter, null, null);

        if (null == combinerClass) {
          Merger.writeFile(rIter, writer, reporter, jobConf);
        } else {
          combineCollector.setWriter(writer);
          combineAndSpill(rIter, reduceCombineInputCounter);
        }
        writer.close();
        compressAwarePath = new CompressAwarePath(outputPath,
            writer.getRawLength(), writer.getCompressedLength());

       ...

  }


入口2:Fetcher线程执行
public RawKeyValueIterator run() throws IOException, InterruptedException {
    // Scale the maximum events we fetch per RPC call to mitigate OOM issues
    // on the ApplicationMaster when a thundering herd of reducers fetch events
    // TODO: This should not be necessary after HADOOP-8942
    int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
        MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
    int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);

    // Start the map-completion events fetcher thread
    final EventFetcher<K,V> eventFetcher = 
      new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
          maxEventsToFetch);
    // 对copy阶段的记录线程启动
    eventFetcher.start();

    // Start the map-output fetcher threads
    boolean isLocal = localMapFiles != null;
    // 默认拉取数据的线程为5个
    final int numFetchers = isLocal ? 1 :
      jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
    Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
    if (isLocal) {
      fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
          merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
          localMapFiles);
      fetchers[0].start();
    } else {
      for (int i=0; i < numFetchers; ++i) {
        fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
                                       reporter, metrics, this, 
                                       reduceTask.getShuffleSecret());
        // 开始对map的拉取
        fetchers[i].start();
      }
    }

    // Wait for shuffle to complete successfully
    while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
      reporter.progress();

      synchronized (this) {
        if (throwable != null) {
          throw new ShuffleError("error in shuffle in " + throwingThreadName,
                                 throwable);
        }
      }
    }

    // Stop the event-fetcher thread
    eventFetcher.shutDown();

    // Stop the map-output fetcher threads
    for (Fetcher<K,V> fetcher : fetchers) {
      fetcher.shutDown();
    }

    // stop the scheduler
    scheduler.close();
    // 复制阶段结束,准备sort阶段
    copyPhase.complete(); 
    taskStatus.setPhase(TaskStatus.Phase.SORT);
    reduceTask.statusUpdate(umbilical);

    // Finish the on-going merges...
    RawKeyValueIterator kvIter = null;
    try {
      // 拉取之后还需进行最后的merge
      kvIter = merger.close();
    } catch (Throwable e) {
      throw new ShuffleError("Error while doing final merge " , e);
    }

    // Sanity check
    synchronized (this) {
      if (throwable != null) {
        throw new ShuffleError("error in shuffle in " + throwingThreadName,
                               throwable);
      }
    }

    return kvIter;
  }

fecher拉取线程执行

public void run() {
    try {
      while (!stopped && !Thread.currentThread().isInterrupted()) {
        MapHost host = null;
        try {
          // If merge is on, block
          merger.waitForResource();

          // Get a host to shuffle from
          host = scheduler.getHost();
          metrics.threadBusy();

          // Shuffle
          copyFromHost(host);
        } finally {
          if (host != null) {
            scheduler.freeHost(host);
            metrics.threadFree();            
          }
        }
      }
    } catch (InterruptedException ie) {
      return;
    } catch (Throwable t) {
      exceptionReporter.reportException(t);
    }
  }
最终的merge
public RawKeyValueIterator close() throws Throwable {
    // Wait for on-going merges to complete
    if (memToMemMerger != null) { 
      memToMemMerger.close();
    }
    // close方法中但凡还有数据的都等待最后一次merger的执行
    inMemoryMerger.close();
    onDiskMerger.close();

    List<InMemoryMapOutput<K, V>> memory = 
      new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
    inMemoryMergedMapOutputs.clear();
    memory.addAll(inMemoryMapOutputs);
    inMemoryMapOutputs.clear();
    List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
    onDiskMapOutputs.clear();
    // 对内存和硬盘中的数据进行merge
    return finalMerge(jobConf, rfs, memory, disk);
  }

三大线程的close方法一旦有数据就不能关闭,等待最后一次merge

public synchronized void close() throws InterruptedException {
    closed = true;
    waitForMerge();
    interrupt();
  }

最终merge:finalMerge:将内存中的数据和硬盘中的数据一并整合

private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
                                       List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
                                       List<CompressAwarePath> onDiskMapOutputs
                                       ) throws IOException {
    LOG.info("finalMerge called with " +
        inMemoryMapOutputs.size() + " in-memory map-outputs and " +
        onDiskMapOutputs.size() + " on-disk map-outputs");
    final long maxInMemReduce = getMaxInMemReduceLimit();
    // merge config params
    Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
    Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
    boolean keepInputs = job.getKeepFailedTaskFiles();
    final Path tmpDir = new Path(reduceId.toString());
    final RawComparator<K> comparator =
      (RawComparator<K>)job.getOutputKeyComparator();

    // segments required to vacate memory
    List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
    long inMemToDiskBytes = 0;
    boolean mergePhaseFinished = false;
    if (inMemoryMapOutputs.size() > 0) {
      TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();
      inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, 
                                                memDiskSegments,
                                                maxInMemReduce);
      final int numMemDiskSegments = memDiskSegments.size();
      if (numMemDiskSegments > 0 &&
            ioSortFactor > onDiskMapOutputs.size()) {

        // If we reach here, it implies that we have less than io.sort.factor
        // disk segments and this will be incremented by 1 (result of the 
        // memory segments merge). Since this total would still be 
        // <= io.sort.factor, we will not do any more intermediate merges,
        // the merge of all these disk segments would be directly fed to the
        // reduce method

        mergePhaseFinished = true;
        // must spill to disk, but can't retain in-mem for intermediate merge
        final Path outputPath = 
          mapOutputFile.getInputFileForWrite(mapId,
                                             inMemToDiskBytes).suffix(
                                                 Task.MERGED_OUTPUT_PREFIX);
        final RawKeyValueIterator rIter = Merger.merge(job, fs,
            keyClass, valueClass, memDiskSegments, numMemDiskSegments,
            tmpDir, comparator, reporter, spilledRecordsCounter, null, 
            mergePhase);

        FSDataOutputStream out = CryptoUtils.wrapIfNecessary(job, fs.create(outputPath));
        Writer<K, V> writer = new Writer<K, V>(job, out, keyClass, valueClass,
            codec, null, true);
        try {
          Merger.writeFile(rIter, writer, reporter, job);
          writer.close();
          onDiskMapOutputs.add(new CompressAwarePath(outputPath,
              writer.getRawLength(), writer.getCompressedLength()));
          writer = null;
          // add to list of final disk outputs.
        } catch (IOException e) {
          if (null != outputPath) {
            try {
              fs.delete(outputPath, true);
            } catch (IOException ie) {
              // NOTHING
            }
          }
          throw e;
        } finally {
          if (null != writer) {
            writer.close();
          }
        }
        LOG.info("Merged " + numMemDiskSegments + " segments, " +
                 inMemToDiskBytes + " bytes to disk to satisfy " +
                 "reduce memory limit");
        inMemToDiskBytes = 0;
        memDiskSegments.clear();
      } else if (inMemToDiskBytes != 0) {
        LOG.info("Keeping " + numMemDiskSegments + " segments, " +
                 inMemToDiskBytes + " bytes in memory for " +
                 "intermediate, on-disk merge");
      }
    }

    // segments on disk
    List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
    long onDiskBytes = inMemToDiskBytes;
    long rawBytes = inMemToDiskBytes;
    CompressAwarePath[] onDisk = onDiskMapOutputs.toArray(
        new CompressAwarePath[onDiskMapOutputs.size()]);
    for (CompressAwarePath file : onDisk) {
      long fileLength = fs.getFileStatus(file).getLen();
      onDiskBytes += fileLength;
      rawBytes += (file.getRawDataLength() > 0) ? file.getRawDataLength() : fileLength;

      LOG.debug("Disk file: " + file + " Length is " + fileLength);
      diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs,
                                         (file.toString().endsWith(
                                             Task.MERGED_OUTPUT_PREFIX) ?
                                          null : mergedMapOutputsCounter), file.getRawDataLength()
                                        ));
    }
    LOG.info("Merging " + onDisk.length + " files, " +
             onDiskBytes + " bytes from disk");
    Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
      public int compare(Segment<K, V> o1, Segment<K, V> o2) {
        if (o1.getLength() == o2.getLength()) {
          return 0;
        }
        return o1.getLength() < o2.getLength() ? -1 : 1;
      }
    });

    // build final list of segments from merged backed by disk + in-mem
    List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
    long inMemBytes = createInMemorySegments(inMemoryMapOutputs, 
                                             finalSegments, 0);
    LOG.info("Merging " + finalSegments.size() + " segments, " +
             inMemBytes + " bytes from memory into reduce");
    if (0 != onDiskBytes) {
      final int numInMemSegments = memDiskSegments.size();
      diskSegments.addAll(0, memDiskSegments);
      memDiskSegments.clear();
      // Pass mergePhase only if there is a going to be intermediate
      // merges. See comment where mergePhaseFinished is being set
      Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; 
      RawKeyValueIterator diskMerge = Merger.merge(
          job, fs, keyClass, valueClass, codec, diskSegments,
          ioSortFactor, numInMemSegments, tmpDir, comparator,
          reporter, false, spilledRecordsCounter, null, thisPhase);
      diskSegments.clear();
      if (0 == finalSegments.size()) {
        return diskMerge;
      }
      finalSegments.add(new Segment<K,V>(
            new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes));
    }
    return Merger.merge(job, fs, keyClass, valueClass,
                 finalSegments, finalSegments.size(), tmpDir,
                 comparator, reporter, spilledRecordsCounter, null,
                 null);

  }


过渡插曲

分组比较器的优先级高于OutputkeyComparator,后续的reduce的key比较器compartor就用这个

public RawComparator getOutputValueGroupingComparator() {
    Class<? extends RawComparator> theClass = getClass(
      JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
    if (theClass == null) {
      return getOutputKeyComparator();
    }

    return ReflectionUtils.newInstance(theClass, this);
  }


入口3:reduce阶段
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewReducer(JobConf job,
                     final TaskUmbilicalProtocol umbilical,
                     final TaskReporter reporter,
                     RawKeyValueIterator rIter,
                     RawComparator<INKEY> comparator,
                     Class<INKEY> keyClass,
                     Class<INVALUE> valueClass
                     ) throws IOException,InterruptedException, 
                              ClassNotFoundException {
    // wrap value iterator to report progress.
    final RawKeyValueIterator rawIter = rIter;
    // 对rIter迭代器的包装
    rIter = new RawKeyValueIterator() {
      public void close() throws IOException {
        rawIter.close();
      }
      public DataInputBuffer getKey() throws IOException {
        return rawIter.getKey();
      }
      public Progress getProgress() {
        return rawIter.getProgress();
      }
      public DataInputBuffer getValue() throws IOException {
        return rawIter.getValue();
      }
      public boolean next() throws IOException {
        boolean ret = rawIter.next();
        reporter.setProgress(rawIter.getProgress().getProgress());
        return ret;
      }
    };
    // make a task context so we can get the classes
    // taskContext的准备
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
          getTaskID(), reporter);
    // make a reducer
    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
      (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
    org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = 
      new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);
    job.setBoolean("mapred.skip.on", isSkipping());
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    // 这里创建了reduceContext【1】
    org.apache.hadoop.mapreduce.Reducer.Context 
         reducerContext = createReduceContext(reducer, job, getTaskID(),
                                               rIter, reduceInputKeyCounter, 
                                               reduceInputValueCounter, 
                                               trackedRW,
                                               committer,
                                               reporter, comparator, keyClass,
                                               valueClass);
    try {
        // 执行自定义的reduce【2】
        reducer.run(reducerContext);
    } finally {
      trackedRW.close(reducerContext);
    }
  }

1.reduceContext返回的是ReduceContextImpl方法

public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
                           // 这是真正的迭代器,可以多次迭代
                           RawKeyValueIterator input, 
                           Counter inputKeyCounter,
                           Counter inputValueCounter,
                           RecordWriter<KEYOUT,VALUEOUT> output,
                           OutputCommitter committer,
                           StatusReporter reporter,
                           RawComparator<KEYIN> comparator,
                           Class<KEYIN> keyClass,
                           Class<VALUEIN> valueClass
                          ) throws InterruptedException, IOException{
    super(conf, taskid, output, committer, reporter);
    this.input = input;
    this.inputKeyCounter = inputKeyCounter;
    this.inputValueCounter = inputValueCounter;
    this.comparator = comparator;
    // map输出的是序列化文件,被reduce拉取出来要反序列化
    this.serializationFactory = new SerializationFactory(conf);
    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
    this.keyDeserializer.open(buffer);
    this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
    this.valueDeserializer.open(buffer);
    // 判断迭代器下一个有无数据
    hasMore = input.next();
    this.keyClass = keyClass;
    this.valueClass = valueClass;
    this.conf = conf;
    this.taskid = taskid;
  }

2.reducer.run

public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKey()) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }
    } finally {
      cleanup(context);
    }
  }

进入context.nextkey方法

public boolean nextKey() throws IOException,InterruptedException {
    while (hasMore && nextKeyIsSame) {
      nextKeyValue();
    }
    if (hasMore) {
      if (inputKeyCounter != null) {
        inputKeyCounter.increment(1);
      }
      return nextKeyValue();
    } else {
      return false;
    }
  }

查看这个类的nextKeyValue方法:主要做了两件事,对kv进行赋值,对nextKeyIsSame更新

public boolean nextKeyValue() throws IOException, InterruptedException {
    if (!hasMore) {
      key = null;
      value = null;
      return false;
    }
    firstValue = !nextKeyIsSame;
    // 对keyvalue进行赋值
    DataInputBuffer nextKey = input.getKey();
    currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
                      nextKey.getLength() - nextKey.getPosition());
    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
    key = keyDeserializer.deserialize(key);
    DataInputBuffer nextVal = input.getValue();
    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
        - nextVal.getPosition());
    value = valueDeserializer.deserialize(value);

    currentKeyLength = nextKey.getLength() - nextKey.getPosition();
    currentValueLength = nextVal.getLength() - nextVal.getPosition();

    if (isMarked) {
      backupStore.write(nextKey, nextVal);
    }
    // 跳转到下一个元素,然后进行hasMore的更新和nextKeyIsSame的更新
    hasMore = input.next();
    if (hasMore) {
      nextKey = input.getKey();
      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
                                     currentRawKey.getLength(),
                                     nextKey.getData(),
                                     nextKey.getPosition(),
                                     nextKey.getLength() - nextKey.getPosition()
                                         ) == 0;
    } else {
      nextKeyIsSame = false;
    }
    inputValueCounter.increment(1);
    return true;
  }

注意到context.getValues()为生成迭代器的方法,进入返回的是另一个迭代器ValueIterator

protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {

    private boolean inReset = false;
    private boolean clearMarkFlag = false;

    @Override
    public boolean hasNext() {
      try {
        if (inReset && backupStore.hasNext()) {
          return true;
        } 
      } catch (Exception e) {
        e.printStackTrace();
        throw new RuntimeException("hasNext failed", e);
      }
      return firstValue || nextKeyIsSame;
    }

    @Override
    public VALUEIN next() {
      if (inReset) {
        try {
          if (backupStore.hasNext()) {
            backupStore.next();
            DataInputBuffer next = backupStore.nextValue();
            buffer.reset(next.getData(), next.getPosition(), next.getLength()
                - next.getPosition());
            value = valueDeserializer.deserialize(value);
            return value;
          } else {
            inReset = false;
            backupStore.exitResetMode();
            if (clearMarkFlag) {
              clearMarkFlag = false;
              isMarked = false;
            }
          }
        } catch (IOException e) {
          e.printStackTrace();
          throw new RuntimeException("next value iterator failed", e);
        }
      } 

      // if this is the first record, we don't need to advance
      if (firstValue) {
        firstValue = false;
        return value;
      }
      // if this isn't the first record and the next key is different, they
      // can't advance it here.
      if (!nextKeyIsSame) {
        throw new NoSuchElementException("iterate past last value");
      }
      // otherwise, go to the next key/value pair
      try {
        // 这里调用了真正迭代器的nextKeyValue(),这个方法会更新nextKeyIsSame
        // 一组数据的迭代结束后nextKeyIsSame就为false,就无法继续迭代,所以无法进行继续更新
        nextKeyValue();
        return value;
      } catch (IOException ie) {
        throw new RuntimeException("next value iterator failed", ie);
      } catch (InterruptedException ie) {
        // this is bad, but we can't modify the exception list of java.util
        throw new RuntimeException("next value iterator interrupted", ie);        
      }
    }
reduce阶段总结

reducer当中先调while循环,条件是nextKey,nextKey调的是nextKeyValue,先对kv做了赋值然后做nextKeyIsSame预判断,判断完之后调reduce方法.

调用reduce方法会将假迭代器传进去,最终我们用到的也是假迭代器,context.nextKey()预判断,并为kv赋值。在我们程序的while循环中,会调用next值取出来,先会调nextKeyValue,调用nextKeyValue又会更新我们的nextKeyIsSame,然后取值,取值完成之后,循环回来,在while循环内又是用nextKeyIsSame来做判定的,如果第二条是一组的,就重复上面的取值,继续迭代,如果迭代到一组的边界了,下一组的数据肯定不一样了,nextKeyIsSame变成false这一条就不迭代了,所以在我们在这个假迭代器中,只能迭代一组数据。

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海牛部落-AIZero,http://hainiubl.com/topics/75333
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter