org.apache.nutch.fetcher
public class: Fetcher [javadoc |
source]
java.lang.Object
org.apache.hadoop.conf.Configured
org.apache.nutch.fetcher.Fetcher
All Implemented Interfaces:
MapRunnable
A queue-based fetcher.
This fetcher uses a well-known model of one producer (a QueueFeeder)
and many consumers (FetcherThread-s).
QueueFeeder reads input fetchlists and
populates a set of FetchItemQueue-s, which hold FetchItem-s that
describe the items to be fetched. There are as many queues as there are unique
hosts, but at any given time the total number of fetch items in all queues
is less than a fixed number (currently set to a multiple of the number of
threads).
As items are consumed from the queues, the QueueFeeder continues to add new
input items, so that their total count stays fixed (FetcherThread-s may also
add new items to the queues e.g. as a results of redirection) - until all
input items are exhausted, at which point the number of items in the queues
begins to decrease. When this number reaches 0 fetcher will finish.
This fetcher implementation handles per-host blocking itself, instead
of delegating this work to protocol-specific plugins.
Each per-host queue handles its own "politeness" settings, such as the
maximum number of concurrent requests and crawl delay between consecutive
requests - and also a list of requests in progress, and the time the last
request was finished. As FetcherThread-s ask for new items to be fetched,
queues may return eligible items or null if for "politeness" reasons this
host's queue is not yet ready.
If there are still unfetched items in the queues, but none of the items
are ready, FetcherThread-s will spin-wait until either some items become
available, or a timeout is reached (at which point the Fetcher will abort,
assuming the task is hung).
- author:
Andrzej - Bialecki
| Field Summary |
|---|
| public static final int | PERM_REFRESH_TIME | |
| public static final String | CONTENT_REDIR | |
| public static final String | PROTOCOL_REDIR | |
| public static final Log | LOG | |
| Fetcher.FetchItemQueues | fetchQueues | |
| Fetcher.QueueFeeder | feeder | |
| Method from org.apache.nutch.fetcher.Fetcher Detail: |
public void close() {
}
|
public void configure(JobConf job) {
setConf(job);
this.segmentName = job.get(Nutch.SEGMENT_NAME_KEY);
this.storingContent = isStoringContent(job);
this.parsing = isParsing(job);
// if (job.getBoolean("fetcher.verbose", false)) {
// LOG.setLevel(Level.FINE);
// }
}
|
public void fetch(Path segment,
int threads,
boolean parsing) throws IOException {
checkConfiguration();
if (LOG.isInfoEnabled()) {
LOG.info("Fetcher: starting");
LOG.info("Fetcher: segment: " + segment);
}
JobConf job = new NutchJob(getConf());
job.setJobName("fetch " + segment);
job.setInt("fetcher.threads.fetch", threads);
job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
job.setBoolean("fetcher.parse", parsing);
// for politeness, don't permit parallel execution of a single task
job.setSpeculativeExecution(false);
FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
job.setInputFormat(InputFormat.class);
job.setMapRunnerClass(Fetcher.class);
FileOutputFormat.setOutputPath(job, segment);
job.setOutputFormat(FetcherOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NutchWritable.class);
JobClient.runJob(job);
if (LOG.isInfoEnabled()) { LOG.info("Fetcher: done"); }
}
|
public static boolean isParsing(Configuration conf) {
return conf.getBoolean("fetcher.parse", true);
}
|
public static boolean isStoringContent(Configuration conf) {
return conf.getBoolean("fetcher.store.content", true);
}
|
public static void main(String[] args) throws Exception {
String usage = "Usage: Fetcher < segment > [-threads n] [-noParsing]";
if (args.length < 1) {
System.err.println(usage);
System.exit(-1);
}
Path segment = new Path(args[0]);
Configuration conf = NutchConfiguration.create();
int threads = conf.getInt("fetcher.threads.fetch", 10);
boolean parsing = true;
for (int i = 1; i < args.length; i++) { // parse command line
if (args[i].equals("-threads")) { // found -threads option
threads = Integer.parseInt(args[++i]);
} else if (args[i].equals("-noParsing")) parsing = false;
}
conf.setInt("fetcher.threads.fetch", threads);
if (!parsing) {
conf.setBoolean("fetcher.parse", parsing);
}
Fetcher fetcher = new Fetcher(conf); // make a Fetcher
fetcher.fetch(segment, threads, parsing); // run the Fetcher
}
|
public void run(RecordReader input,
OutputCollector output,
Reporter reporter) throws IOException {
this.output = output;
this.reporter = reporter;
this.fetchQueues = new FetchItemQueues(getConf());
int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
if (LOG.isInfoEnabled()) { LOG.info("Fetcher: threads: " + threadCount); }
feeder = new QueueFeeder(input, fetchQueues, threadCount * 50);
//feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
feeder.start();
// set non-blocking & no-robots mode for HTTP protocol plugins.
getConf().setBoolean(Protocol.CHECK_BLOCKING, false);
getConf().setBoolean(Protocol.CHECK_ROBOTS, false);
for (int i = 0; i < threadCount; i++) { // spawn threads
new FetcherThread(getConf()).start();
}
// select a timeout that avoids a task timeout
long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2;
do { // wait for threads to exit
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
reportStatus();
LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
+ ", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
fetchQueues.dump();
}
// some requests seem to hang, despite all intentions
if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
if (LOG.isWarnEnabled()) {
LOG.warn("Aborting with "+activeThreads+" hung threads.");
}
return;
}
} while (activeThreads.get() > 0);
LOG.info("-activeThreads=" + activeThreads);
}
|