public void reduce(MD5Hash key,
Iterator values,
OutputCollector output,
Reporter reporter) throws IOException {
boolean highestSet = false;
while (values.hasNext()) {
IndexDoc value = values.next();
// skip already deleted
if (!value.keep) {
LOG.debug("-discard " + value + " (already marked)");
output.collect(value.url, value);
continue;
}
if (!highestSet) {
WritableUtils.cloneInto(highest, value);
highestSet = true;
continue;
}
IndexDoc toDelete = null, toKeep = null;
boolean metric = byScore ? (value.score > highest.score) :
(value.urlLen < highest.urlLen);
if (metric) {
toDelete = highest;
toKeep = value;
} else {
toDelete = value;
toKeep = highest;
}
if (LOG.isDebugEnabled()) {
LOG.debug("-discard " + toDelete + ", keep " + toKeep);
}
toDelete.keep = false;
output.collect(toDelete.url, toDelete);
WritableUtils.cloneInto(highest, toKeep);
}
LOG.debug("-keep " + highest);
// no need to add this - in phase 2 we only process docs to delete them
// highest.keep = true;
// output.collect(key, highest);
}
|