Delete duplicate documents in a set of Lucene indexes.
Duplicates have either the same contents (via MD5 hash) or the same URL.
This tool uses the following algorithm:
| Method from org.apache.nutch.indexer.DeleteDuplicates Detail: |
public void checkOutputSpecs(FileSystem fs,
JobConf job) {
}
|
public void close() {
}
|
public void configure(JobConf job) {
setConf(job);
}
|
public void dedup(Path[] indexDirs) throws IOException {
if (LOG.isInfoEnabled()) { LOG.info("Dedup: starting"); }
Path outDir1 =
new Path("dedup-urls-"+
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
JobConf job = new NutchJob(getConf());
for (int i = 0; i < indexDirs.length; i++) {
if (LOG.isInfoEnabled()) {
LOG.info("Dedup: adding indexes in: " + indexDirs[i]);
}
FileInputFormat.addInputPath(job, indexDirs[i]);
}
job.setJobName("dedup 1: urls by time");
job.setInputFormat(InputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IndexDoc.class);
job.setReducerClass(UrlsReducer.class);
FileOutputFormat.setOutputPath(job, outDir1);
job.setOutputKeyClass(MD5Hash.class);
job.setOutputValueClass(IndexDoc.class);
job.setOutputFormat(SequenceFileOutputFormat.class);
JobClient.runJob(job);
Path outDir2 =
new Path("dedup-hash-"+
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
job = new NutchJob(getConf());
job.setJobName("dedup 2: content by hash");
FileInputFormat.addInputPath(job, outDir1);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapOutputKeyClass(MD5Hash.class);
job.setMapOutputValueClass(IndexDoc.class);
job.setPartitionerClass(HashPartitioner.class);
job.setSpeculativeExecution(false);
job.setReducerClass(HashReducer.class);
FileOutputFormat.setOutputPath(job, outDir2);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IndexDoc.class);
job.setOutputFormat(SequenceFileOutputFormat.class);
JobClient.runJob(job);
// remove outDir1 - no longer needed
fs.delete(outDir1, true);
job = new NutchJob(getConf());
job.setJobName("dedup 3: delete from index(es)");
FileInputFormat.addInputPath(job, outDir2);
job.setInputFormat(SequenceFileInputFormat.class);
//job.setInputKeyClass(Text.class);
//job.setInputValueClass(IndexDoc.class);
job.setInt("io.file.buffer.size", 4096);
job.setMapperClass(DeleteDuplicates.class);
job.setReducerClass(DeleteDuplicates.class);
job.setOutputFormat(DeleteDuplicates.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
JobClient.runJob(job);
fs.delete(outDir2, true);
if (LOG.isInfoEnabled()) { LOG.info("Dedup: done"); }
}
|
public RecordWriter getRecordWriter(FileSystem fs,
JobConf job,
String name,
Progressable progress) throws IOException {
return new RecordWriter< WritableComparable, Writable >() {
public void write(WritableComparable key, Writable value)
throws IOException {
throw new UnsupportedOperationException();
}
public void close(Reporter reporter) throws IOException {}
};
}
|
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(NutchConfiguration.create(), new DeleteDuplicates(), args);
System.exit(res);
}
|
public void map(WritableComparable key,
Writable value,
OutputCollector output,
Reporter reporter) throws IOException {
IndexDoc indexDoc = (IndexDoc)value;
// don't delete these
if (indexDoc.keep) return;
// delete all others
output.collect(indexDoc.index, new IntWritable(indexDoc.doc));
}
Map [*,IndexDoc] pairs to [index,doc] pairs. |
public void reduce(Text key,
Iterator values,
OutputCollector output,
Reporter reporter) throws IOException {
Path index = new Path(key.toString());
IndexReader reader = IndexReader.open(new FsDirectory(fs, index, false, getConf()));
try {
while (values.hasNext()) {
IntWritable value = values.next();
LOG.debug("-delete " + index + " doc=" + value);
reader.deleteDocument(value.get());
}
} finally {
reader.close();
}
}
Delete docs named in values from index named in key. |
public int run(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: DeleteDuplicates < indexes > ...");
return -1;
}
Path[] indexes = new Path[args.length];
for (int i = 0; i < args.length; i++) {
indexes[i] = new Path(args[i]);
}
try {
dedup(indexes);
return 0;
} catch (Exception e) {
LOG.fatal("DeleteDuplicates: " + StringUtils.stringifyException(e));
return -1;
}
}
|
public void setConf(Configuration conf) {
super.setConf(conf);
try {
if(conf != null) fs = FileSystem.get(conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
|