public void inject(Path crawlDb,
Path urlDir) throws IOException {
if (LOG.isInfoEnabled()) {
LOG.info("Injector: starting");
LOG.info("Injector: crawlDb: " + crawlDb);
LOG.info("Injector: urlDir: " + urlDir);
}
Path tempDir =
new Path(getConf().get("mapred.temp.dir", ".") +
"/inject-temp-"+
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
// map text input file to a < url,CrawlDatum > file
if (LOG.isInfoEnabled()) {
LOG.info("Injector: Converting injected urls to crawl db entries.");
}
JobConf sortJob = new NutchJob(getConf());
sortJob.setJobName("inject " + urlDir);
FileInputFormat.addInputPath(sortJob, urlDir);
sortJob.setMapperClass(InjectMapper.class);
FileOutputFormat.setOutputPath(sortJob, tempDir);
sortJob.setOutputFormat(SequenceFileOutputFormat.class);
sortJob.setOutputKeyClass(Text.class);
sortJob.setOutputValueClass(CrawlDatum.class);
sortJob.setLong("injector.current.time", System.currentTimeMillis());
JobClient.runJob(sortJob);
// merge with existing crawl db
if (LOG.isInfoEnabled()) {
LOG.info("Injector: Merging injected urls into crawl db.");
}
JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb);
FileInputFormat.addInputPath(mergeJob, tempDir);
mergeJob.setReducerClass(InjectReducer.class);
JobClient.runJob(mergeJob);
CrawlDb.install(mergeJob, crawlDb);
// clean up
FileSystem fs = FileSystem.get(getConf());
fs.delete(tempDir, true);
if (LOG.isInfoEnabled()) { LOG.info("Injector: done"); }
}
|