public Path generate(Path dbDir,
Path segments,
int numLists,
long topN,
long curTime) throws IOException {
JobConf job = new NutchJob(getConf());
boolean filter = job.getBoolean(CRAWL_GENERATE_FILTER, true);
return generate(dbDir, segments, numLists, topN, curTime, filter, false);
}
Generate fetchlists in a segment. Whether to filter URLs or not is
read from the crawl.generate.filter property in the configuration
files. If the property is not found, the URLs are filtered. |
public Path generate(Path dbDir,
Path segments,
int numLists,
long topN,
long curTime,
boolean filter,
boolean force) throws IOException {
Path tempDir =
new Path(getConf().get("mapred.temp.dir", ".") +
"/generate-temp-"+ System.currentTimeMillis());
Path segment = new Path(segments, generateSegmentName());
Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME);
Path lock = new Path(dbDir, CrawlDb.LOCK_NAME);
FileSystem fs = FileSystem.get(getConf());
LockUtil.createLockFile(fs, lock, force);
LOG.info("Generator: Selecting best-scoring urls due for fetch.");
LOG.info("Generator: starting");
LOG.info("Generator: segment: " + segment);
LOG.info("Generator: filtering: " + filter);
if (topN != Long.MAX_VALUE) {
LOG.info("Generator: topN: " + topN);
}
// map to inverted subset due for fetch, sort by score
JobConf job = new NutchJob(getConf());
job.setJobName("generate: select " + segment);
if (numLists == -1) { // for politeness make
numLists = job.getNumMapTasks(); // a partition per fetch task
}
if ("local".equals(job.get("mapred.job.tracker")) && numLists != 1) {
// override
LOG.info("Generator: jobtracker is 'local', generating exactly one partition.");
numLists = 1;
}
job.setLong(CRAWL_GEN_CUR_TIME, curTime);
// record real generation time
long generateTime = System.currentTimeMillis();
job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);
job.setLong(CRAWL_TOP_N, topN);
job.setBoolean(CRAWL_GENERATE_FILTER, filter);
FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(Selector.class);
job.setPartitionerClass(Selector.class);
job.setReducerClass(Selector.class);
FileOutputFormat.setOutputPath(job, tempDir);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(FloatWritable.class);
job.setOutputKeyComparatorClass(DecreasingFloatComparator.class);
job.setOutputValueClass(SelectorEntry.class);
try {
JobClient.runJob(job);
} catch (IOException e) {
LockUtil.removeLockFile(fs, lock);
throw e;
}
// check that we selected at least some entries ...
SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(job, tempDir);
boolean empty = true;
if (readers != null && readers.length > 0) {
for (int num = 0; num < readers.length; num++) {
if (readers[num].next(new FloatWritable())) {
empty = false;
break;
}
}
}
for (int i = 0; i < readers.length; i++) readers[i].close();
if (empty) {
LOG.warn("Generator: 0 records selected for fetching, exiting ...");
LockUtil.removeLockFile(fs, lock);
fs.delete(tempDir, true);
return null;
}
// invert again, paritition by host, sort by url hash
if (LOG.isInfoEnabled()) {
LOG.info("Generator: Partitioning selected urls by host, for politeness.");
}
job = new NutchJob(getConf());
job.setJobName("generate: partition " + segment);
job.setInt("partition.url.by.host.seed", new Random().nextInt());
FileInputFormat.addInputPath(job, tempDir);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(SelectorInverseMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(SelectorEntry.class);
job.setPartitionerClass(PartitionUrlByHost.class);
job.setReducerClass(PartitionReducer.class);
job.setNumReduceTasks(numLists);
FileOutputFormat.setOutputPath(job, output);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
job.setOutputKeyComparatorClass(HashComparator.class);
try {
JobClient.runJob(job);
} catch (IOException e) {
LockUtil.removeLockFile(fs, lock);
fs.delete(tempDir, true);
throw e;
}
if (getConf().getBoolean(GENERATE_UPDATE_CRAWLDB, false)) {
// update the db from tempDir
Path tempDir2 =
new Path(getConf().get("mapred.temp.dir", ".") +
"/generate-temp-"+ System.currentTimeMillis());
job = new NutchJob(getConf());
job.setJobName("generate: updatedb " + dbDir);
job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);
FileInputFormat.addInputPath(job, tempDir);
FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(CrawlDbUpdater.class);
job.setReducerClass(CrawlDbUpdater.class);
job.setOutputFormat(MapFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
FileOutputFormat.setOutputPath(job, tempDir2);
try {
JobClient.runJob(job);
CrawlDb.install(job, dbDir);
} catch (IOException e) {
LockUtil.removeLockFile(fs, lock);
fs.delete(tempDir, true);
fs.delete(tempDir2, true);
throw e;
}
fs.delete(tempDir2, true);
}
LockUtil.removeLockFile(fs, lock);
fs.delete(tempDir, true);
if (LOG.isInfoEnabled()) { LOG.info("Generator: done."); }
return segment;
}
Generate fetchlists in a segment. |
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.out.println("Usage: Generator < crawldb > < segments_dir > [-force] [-topN N] [-numFetchers numFetchers] [-adddays numDays] [-noFilter]");
return -1;
}
Path dbDir = new Path(args[0]);
Path segmentsDir = new Path(args[1]);
long curTime = System.currentTimeMillis();
long topN = Long.MAX_VALUE;
int numFetchers = -1;
boolean filter = true;
boolean force = false;
for (int i = 2; i < args.length; i++) {
if ("-topN".equals(args[i])) {
topN = Long.parseLong(args[i+1]);
i++;
} else if ("-numFetchers".equals(args[i])) {
numFetchers = Integer.parseInt(args[i+1]);
i++;
} else if ("-adddays".equals(args[i])) {
long numDays = Integer.parseInt(args[i+1]);
curTime += numDays * 1000L * 60 * 60 * 24;
} else if ("-noFilter".equals(args[i])) {
filter = false;
} else if ("-force".equals(args[i])) {
force = true;
}
}
try {
Path seg = generate(dbDir, segmentsDir, numFetchers, topN, curTime, filter, force);
if (seg == null) return -2;
else return 0;
} catch (Exception e) {
LOG.fatal("Generator: " + StringUtils.stringifyException(e));
return -1;
}
}
|