public void run(JobConf job,
TaskUmbilicalProtocol umbilical) throws IOException {
Reducer reducer = (Reducer)ReflectionUtils.newInstance(
job.getReducerClass(), job);
// start thread that will handle communication with parent
startCommunicationThread(umbilical);
FileSystem lfs = FileSystem.getLocal(job);
if (!job.get("mapred.job.tracker", "local").equals("local")) {
reduceCopier = new ReduceCopier(umbilical, job);
if (!reduceCopier.fetchOutputs()) {
throw new IOException(getTaskId() + "The reduce copier failed");
}
}
copyPhase.complete(); // copy is already complete
// open a file to collect map output
// since we don't know how many map outputs got merged in memory, we have
// to check whether a given map output exists, and if it does, add it in
// the list of files to merge, otherwise not.
List< Path > mapFilesList = new ArrayList< Path >();
for(int i=0; i < numMaps; i++) {
Path f;
try {
//catch and ignore DiskErrorException, since some map outputs will
//really be absent (inmem merge).
f = mapOutputFile.getInputFile(i, getTaskId());
} catch (DiskErrorException d) {
continue;
}
if (lfs.exists(f))
mapFilesList.add(f);
}
Path[] mapFiles = new Path[mapFilesList.size()];
mapFiles = mapFilesList.toArray(mapFiles);
Path tempDir = new Path(getTaskId());
SequenceFile.Sorter.RawKeyValueIterator rIter;
setPhase(TaskStatus.Phase.SORT);
final Reporter reporter = getReporter(umbilical);
// sort the input file
SequenceFile.Sorter sorter = new SequenceFile.Sorter(lfs,
job.getOutputKeyComparator(), job.getMapOutputValueClass(), job);
sorter.setProgressable(reporter);
rIter = sorter.merge(mapFiles, tempDir,
!conf.getKeepFailedTaskFiles()); // sort
sortPhase.complete(); // sort is complete
setPhase(TaskStatus.Phase.REDUCE);
// make output collector
String finalName = getOutputName(getPartition());
FileSystem fs = FileSystem.get(job);
final RecordWriter out =
job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
OutputCollector collector = new OutputCollector() {
public void collect(WritableComparable key, Writable value)
throws IOException {
out.write(key, value);
reporter.incrCounter(REDUCE_OUTPUT_RECORDS, 1);
// indicate that progress update needs to be sent
reporter.progress();
}
};
// apply reduce function
try {
Class keyClass = job.getMapOutputKeyClass();
Class valClass = job.getMapOutputValueClass();
ReduceValuesIterator values = new ReduceValuesIterator(rIter,
job.getOutputValueGroupingComparator(), keyClass, valClass,
job, reporter);
values.informReduceProgress();
while (values.more()) {
reporter.incrCounter(REDUCE_INPUT_GROUPS, 1);
reducer.reduce(values.getKey(), values, collector, reporter);
values.nextKey();
values.informReduceProgress();
}
//Clean up: repeated in catch block below
reducer.close();
out.close(reporter);
//End of clean up.
} catch (IOException ioe) {
try {
reducer.close();
} catch (IOException ignored) {}
try {
out.close(reporter);
} catch (IOException ignored) {}
throw ioe;
}
done(umbilical);
}
|