public void reduce(FloatWritable key,
Iterator values,
OutputCollector output,
Reporter reporter) throws IOException {
while (values.hasNext() && count < limit) {
SelectorEntry entry = values.next();
Text url = entry.url;
String urlString = url.toString();
URL u = null;
// skip bad urls, including empty and null urls
try {
u = new URL(url.toString());
} catch (MalformedURLException e) {
LOG.info("Bad protocol in url: " + url.toString());
continue;
}
String host = u.getHost();
host = host.toLowerCase();
String hostname = host;
// partitioning by ip will generate lots of DNS requests here, and will
// be up to double the overall dns load, do not run this way unless you
// are running a local caching DNS server or a two layer DNS cache
if (byIP) {
if (maxedHosts.contains(host)) {
if (LOG.isDebugEnabled()) { LOG.debug("Host already maxed out: " + host); }
continue;
}
if (dnsFailureHosts.contains(host)) {
if (LOG.isDebugEnabled()) { LOG.debug("Host name lookup already failed: " + host); }
continue;
}
try {
InetAddress ia = InetAddress.getByName(host);
host = ia.getHostAddress();
urlString = new URL(u.getProtocol(), host, u.getPort(), u.getFile()).toString();
}
catch (UnknownHostException uhe) {
// remember hostnames that could not be looked up
dnsFailureHosts.add(hostname);
if (LOG.isDebugEnabled()) {
LOG.debug("DNS lookup failed: " + host + ", skipping.");
}
dnsFailure++;
if ((dnsFailure % 1000 == 0) && (LOG.isWarnEnabled())) {
LOG.warn("DNS failures: " + dnsFailure);
}
continue;
}
}
try {
urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
host = new URL(urlString).getHost();
} catch (Exception e) {
LOG.warn("Malformed URL: '" + urlString + "', skipping (" +
StringUtils.stringifyException(e) + ")");
continue;
}
// only filter if we are counting hosts
if (maxPerHost > 0) {
IntWritable hostCount = hostCounts.get(host);
if (hostCount == null) {
hostCount = new IntWritable();
hostCounts.put(host, hostCount);
}
// increment hostCount
hostCount.set(hostCount.get() + 1);
// skip URL if above the limit per host.
if (hostCount.get() > maxPerHost) {
if (hostCount.get() == maxPerHost + 1) {
// remember the raw hostname that is maxed out
maxedHosts.add(hostname);
if (LOG.isInfoEnabled()) {
LOG.info("Host " + host + " has more than " + maxPerHost +
" URLs." + " Skipping additional.");
}
}
continue;
}
}
output.collect(key, entry);
// Count is incremented only when we keep the URL
// maxPerHost may cause us to skip it.
count++;
}
}
Collect until limit is reached. |