diff --git a/src/java/org/apache/nutch/crawl/Generator.java b/src/java/org/apache/nutch/crawl/Generator.java index 6c74aab73c..64f6daed26 100644 --- a/src/java/org/apache/nutch/crawl/Generator.java +++ b/src/java/org/apache/nutch/crawl/Generator.java @@ -17,26 +17,53 @@ package org.apache.nutch.crawl; -import java.io.*; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.lang.invoke.MethodHandles; -import java.net.*; -import java.util.*; -import java.text.*; +import java.net.URL; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; -// rLogging imports -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.commons.jexl2.Expression; import org.apache.commons.jexl2.JexlContext; import org.apache.commons.jexl2.MapContext; -import org.apache.hadoop.io.*; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat; -import org.apache.hadoop.util.*; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapFileOutputFormat; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Partitioner; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.lib.MultipleInputs; +import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; import org.apache.nutch.hostdb.HostDatum; import org.apache.nutch.metadata.Nutch; import org.apache.nutch.net.URLFilterException; @@ -51,6 +78,9 @@ import org.apache.nutch.util.NutchTool; import org.apache.nutch.util.TimingUtil; import org.apache.nutch.util.URLUtil; +// rLogging imports +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Generates a subset of a crawl db to fetch. This version allows to generate @@ -62,8 +92,8 @@ **/ public class Generator extends NutchTool implements Tool { - protected static final Logger LOG = LoggerFactory - .getLogger(MethodHandles.lookup().lookupClass()); + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles + .lookup().lookupClass()); public static final String GENERATE_UPDATE_CRAWLDB = "generate.update.crawldb"; public static final String GENERATOR_MIN_SCORE = "generate.min.score"; @@ -88,23 +118,27 @@ public static class SelectorEntry implements Writable { public Text url; public CrawlDatum datum; public IntWritable segnum; + public HostDatum hostdatum; public SelectorEntry() { url = new Text(); datum = new CrawlDatum(); segnum = new IntWritable(0); + hostdatum = new HostDatum(); } public void readFields(DataInput in) throws IOException { url.readFields(in); datum.readFields(in); segnum.readFields(in); + hostdatum.readFields(in); } public void write(DataOutput out) throws IOException { url.write(out); datum.write(out); segnum.write(out); + hostdatum.write(out); } public String toString() { @@ -113,20 +147,163 @@ public String toString() { } } + public static class HostDbReaderMapper implements + Mapper { + @Override + public void configure(JobConf conf) { + // TODO Auto-generated method stub + } + + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + } + + @Override + public void map(Text hostname, HostDatum value, + OutputCollector output, Reporter reporter) + throws IOException { + SelectorEntry hostDataSelector = new SelectorEntry(); + hostDataSelector.hostdatum = value; + + // setup small/big score on the output to distinguish between hostdatum + // mapper and data mapper. + output.collect(new FloatTextPair(new FloatWritable(-Float.MAX_VALUE), + hostname), hostDataSelector); + + } + } + + public static class FloatTextPair implements + WritableComparable { + public FloatWritable first; + public Text second; + + public FloatTextPair() { + this.first = new FloatWritable(); + this.second = new Text(); + } + + public FloatTextPair(FloatWritable first, Text second) { + // super(); + this.first = first; + this.second = second; + } + + public FloatTextPair(float first, String second) { + this.first = new FloatWritable(first); + this.second = new Text(second); + } + + public FloatWritable getFirst() { + return first; + } + + public void setFirst(FloatWritable first) { + this.first = first; + } + + public Text getSecond() { + return second; + } + + public void setSecond(Text second) { + this.second = second; + } + + public void set(FloatWritable first, Text second) { + this.first = first; + this.second = second; + } + + @Override + public int hashCode() { + // TODO Auto-generated method stub + return first.hashCode() * 163 + second.hashCode(); + } + + @Override + public boolean equals(Object obj) { + // TODO Auto-generated method stub + if (obj instanceof FloatTextPair) { + FloatTextPair tp = (FloatTextPair) obj; + return first.equals(tp.getFirst()) && second.equals(tp.getSecond()); + } + return false; + } + + @Override + public String toString() { + return first + "\t" + second; + } + + @Override + public void readFields(DataInput in) throws IOException { + first.readFields(in); + second.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + first.write(out); + second.write(out); + } + + @Override + public int compareTo(FloatTextPair tp) { + int cmp = first.compareTo(tp.getFirst()); + if (cmp != 0) + return cmp; + return second.compareTo(tp.getSecond()); + + } + + } + + // The comparator is made to "merge" hostdb data and crawldb data. See + // NUTCH-2455 + // TODO : Implement RawComparator + public static class ScoreHostKeyComparator extends WritableComparator { + protected ScoreHostKeyComparator() { + super(FloatTextPair.class, true); + } + + @Override + public int compare(WritableComparable w1, WritableComparable w2) { + FloatTextPair key1 = (FloatTextPair) w1; + FloatTextPair key2 = (FloatTextPair) w2; + + Boolean iskey1HostDatum = key1.second.getLength() > 0; + Boolean iskey2HostDatum = key2.second.getLength() > 0; + + if (iskey1HostDatum && iskey2HostDatum) { + return key1.second.compareTo(key2.second); + } else { + if (iskey1HostDatum == iskey2HostDatum) { + return -1 * key1.first.compareTo(key2.first); + } else if (iskey1HostDatum) { + return -1; + } else { + return 1; + } + } + } + } + /** Selects entries due for fetch. */ public static class Selector implements - Mapper, - Partitioner, - Reducer { + Mapper, + Partitioner, + Reducer { private LongWritable genTime = new LongWritable(System.currentTimeMillis()); private long curTime; private long limit; private long count; - private HashMap hostCounts = new HashMap<>(); + private HashMap> hostDomainCounts = new HashMap<>(); private int segCounts[]; private int maxCount; private boolean byDomain = false; - private Partitioner partitioner = new URLPartitioner(); + private URLPartitioner partitioner = new URLPartitioner(); private URLFilters filters; private URLNormalizers normalizers; private ScoringFilters scfilters; @@ -142,11 +319,10 @@ public static class Selector implements private int maxNumSegments = 1; private Expression expr = null; private int currentsegmentnum = 1; - private SequenceFile.Reader[] hostdbReaders = null; private Expression maxCountExpr = null; private Expression fetchDelayExpr = null; private JobConf conf = null; - + public void configure(JobConf job) { this.conf = job; curTime = job.getLong(GENERATOR_CUR_TIME, System.currentTimeMillis()); @@ -177,41 +353,25 @@ public void configure(JobConf job) { expr = JexlUtil.parseExpression(job.get(GENERATOR_EXPR, null)); maxNumSegments = job.getInt(GENERATOR_MAX_NUM_SEGMENTS, 1); segCounts = new int[maxNumSegments]; - + if (job.get(GENERATOR_HOSTDB) != null) { - maxCountExpr = JexlUtil.parseExpression(job.get(GENERATOR_MAX_COUNT_EXPR, null)); - fetchDelayExpr = JexlUtil.parseExpression(job.get(GENERATOR_FETCH_DELAY_EXPR, null)); - } - } - - public void open() { - if (conf.get(GENERATOR_HOSTDB) != null) { - try { - Path path = new Path(conf.get(GENERATOR_HOSTDB), "current"); - hostdbReaders = SequenceFileOutputFormat.getReaders(conf, path); - } catch (IOException e) { - LOG.error("Error reading HostDB because {}", e.getMessage()); - } + maxCountExpr = JexlUtil.parseExpression(job.get( + GENERATOR_MAX_COUNT_EXPR, null)); + fetchDelayExpr = JexlUtil.parseExpression(job.get( + GENERATOR_FETCH_DELAY_EXPR, null)); } } + @Override public void close() { - if (hostdbReaders != null) { - try { - for (int i = 0; i < hostdbReaders.length; i++) { - hostdbReaders[i].close(); - } - } catch (IOException e) { - LOG.error("Error closing HostDB because {}", e.getMessage()); - } - } } /** Select and invert subset due for fetch. */ public void map(Text key, CrawlDatum value, - OutputCollector output, Reporter reporter) + OutputCollector output, Reporter reporter) throws IOException { Text url = key; + if (filter) { // If filtering is on don't generate URLs that don't pass // URLFilters @@ -249,7 +409,7 @@ public void map(Text key, CrawlDatum value, LOG.warn("Couldn't filter generatorSortValue for " + key + ": " + sfe); } } - + // check expr if (expr != null) { if (!crawlDatum.evaluate(expr, key.toString())) { @@ -278,34 +438,21 @@ public void map(Text key, CrawlDatum value, crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime); entry.datum = crawlDatum; entry.url = key; - output.collect(sortValue, entry); // invert for sort by score + + output.collect(new FloatTextPair(sortValue, new Text()), entry); // invert + // for + // sort + // by + // score } /** Partition by host / domain or IP. */ - public int getPartition(FloatWritable key, Writable value, + public int getPartition(FloatTextPair key, Writable value, int numReduceTasks) { - return partitioner.getPartition(((SelectorEntry) value).url, key, - numReduceTasks); - } - - private HostDatum getHostDatum(String host) throws Exception { - Text key = new Text(); - HostDatum value = new HostDatum(); - - open(); - for (int i = 0; i < hostdbReaders.length; i++) { - while (hostdbReaders[i].next(key, value)) { - if (host.equals(key.toString())) { - close(); - return value; - } - } - } - - close(); - return null; + return partitioner.getPartition(((SelectorEntry) value).url, + key.getSecond(), numReduceTasks); } - + private JexlContext createContext(HostDatum datum) { JexlContext context = new MapContext(); context.set("dnsFailures", datum.getDnsFailures()); @@ -317,164 +464,192 @@ private JexlContext createContext(HostDatum datum) { context.set("redirPerm", datum.getRedirPerm()); context.set("gone", datum.getGone()); context.set("conf", conf); - + // Set metadata variables for (Map.Entry entry : datum.getMetaData().entrySet()) { Object value = entry.getValue(); - + if (value instanceof FloatWritable) { - FloatWritable fvalue = (FloatWritable)value; - Text tkey = (Text)entry.getKey(); + FloatWritable fvalue = (FloatWritable) value; + Text tkey = (Text) entry.getKey(); context.set(tkey.toString(), fvalue.get()); } - + if (value instanceof IntWritable) { - IntWritable ivalue = (IntWritable)value; - Text tkey = (Text)entry.getKey(); + IntWritable ivalue = (IntWritable) value; + Text tkey = (Text) entry.getKey(); context.set(tkey.toString(), ivalue.get()); } - + if (value instanceof Text) { - Text tvalue = (Text)value; - Text tkey = (Text)entry.getKey(); + Text tvalue = (Text) value; + Text tkey = (Text) entry.getKey(); context.set(tkey.toString().replace("-", "_"), tvalue.toString()); } } - + return context; } /** Collect until limit is reached. */ - public void reduce(FloatWritable key, Iterator values, + public void reduce(FloatTextPair key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { - - String hostname = null; - HostDatum host = null; + LongWritable variableFetchDelayWritable = null; // in millis Text variableFetchDelayKey = new Text("_variableFetchDelay_"); int maxCount = this.maxCount; + int[] hostDomainCount = null; + HostDatum hostDatum = null; + while (values.hasNext()) { SelectorEntry entry = values.next(); Text url = entry.url; String urlString = url.toString(); URL u = null; - - // Do this only once per queue - if (host == null) { + String hostorDomainName = null; + + // Extract hostdatum + if (key.second.getLength() > 0) { try { - hostname = URLUtil.getHost(urlString); - host = getHostDatum(hostname); - } catch (Exception e) {} - - // Got it? - if (host == null) { - // Didn't work, prevent future lookups - host = new HostDatum(); - } else { - if (maxCountExpr != null) { - long variableMaxCount = Math.round((double)maxCountExpr.evaluate(createContext(host))); - LOG.info("Generator: variable maxCount: {} for {}", variableMaxCount, hostname); - maxCount = (int)variableMaxCount; + hostDatum = entry.hostdatum; + hostDomainCounts.put(key.second.toString(), + new MutablePair( + (HostDatum) hostDatum.clone(), new int[] { 1, 0 })); + } catch (Exception e) { + LOG.info("Exception while writng hostdb to hostDomainCounts", + e.toString()); + } + } else // Process normal input with pre-filled in hostdatum in // + // hostCounts + { + try { + u = new URL(urlString); + + if (byDomain) { + hostorDomainName = URLUtil.getUrlRootByMode(u, + URLPartitioner.PARTITION_MODE_DOMAIN).toLowerCase(); + } else { + hostorDomainName = URLUtil.getUrlRootByMode(u, + URLPartitioner.PARTITION_MODE_HOST).toLowerCase(); } - if (fetchDelayExpr != null) { - long variableFetchDelay = Math.round((double)fetchDelayExpr.evaluate(createContext(host))); - LOG.info("Generator: variable fetchDelay: {} ms for {}", variableFetchDelay, hostname); - variableFetchDelayWritable = new LongWritable(variableFetchDelay); + + MutablePair hostDomainCountPair = hostDomainCounts + .get(hostorDomainName); + + if (hostDomainCountPair == null) { + hostDomainCount = new int[] { 1, 0 }; + hostDomainCountPair = new MutablePair(null, + hostDomainCount); + hostDomainCounts.put(hostorDomainName, hostDomainCountPair); + } else { + hostDomainCount = hostDomainCountPair.getRight(); } - } - } - - if(maxCount == 0){ - continue; - } - - // Got a non-zero variable fetch delay? Add it to the datum's metadata - if (variableFetchDelayWritable != null) { - entry.datum.getMetaData().put(variableFetchDelayKey, variableFetchDelayWritable); - } - if (count == limit) { - // do we have any segments left? - if (currentsegmentnum < maxNumSegments) { - count = 0; - currentsegmentnum++; - } else - break; - } + // Check hostdb expressions only for host, ignore domains + if (!byDomain) + hostDatum = hostDomainCountPair.getLeft(); + + if (hostDatum != null) { + if (maxCountExpr != null) { + long variableMaxCount = Math.round((double) maxCountExpr + .evaluate(createContext(hostDatum))); + LOG.info("Generator: variable maxCount: {} for {}", + variableMaxCount, hostorDomainName); + maxCount = (int) variableMaxCount; + } - String hostordomain = null; + if (fetchDelayExpr != null) { + long variableFetchDelay = Math.round((double) fetchDelayExpr + .evaluate(createContext(hostDatum))); + LOG.info("Generator: variable fetchDelay: {} ms for {}", + variableFetchDelay, hostorDomainName); + variableFetchDelayWritable = new LongWritable( + variableFetchDelay); + } + } + } catch (Exception e) { + LOG.info( + "Exception while doing host/domain extraction and expressions evaluation", + e.toString()); + } - try { - if (normalise && normalizers != null) { - urlString = normalizers.normalize(urlString, - URLNormalizers.SCOPE_GENERATE_HOST_COUNT); + if (maxCount == 0) { + continue; } - u = new URL(urlString); - if (byDomain) { - hostordomain = URLUtil.getDomainName(u); - } else { - hostordomain = new URL(urlString).getHost(); + // Got a non-zero variable fetch delay? Add it to the datum's metadata + if (variableFetchDelayWritable != null) { + entry.datum.getMetaData().put(variableFetchDelayKey, + variableFetchDelayWritable); } - } catch (Exception e) { - LOG.warn("Malformed URL: '" + urlString + "', skipping (" - + StringUtils.stringifyException(e) + ")"); - reporter.getCounter("Generator", "MALFORMED_URL").increment(1); - continue; - } - hostordomain = hostordomain.toLowerCase(); - - // only filter if we are counting hosts or domains - if (maxCount > 0) { - int[] hostCount = hostCounts.get(hostordomain); - if (hostCount == null) { - hostCount = new int[] { 1, 0 }; - hostCounts.put(hostordomain, hostCount); + if (count == limit) { + // do we have any segments left? + if (currentsegmentnum < maxNumSegments) { + count = 0; + currentsegmentnum++; + } else + break; } - // increment hostCount - hostCount[1]++; + try { + if (normalise && normalizers != null) { + urlString = normalizers.normalize(urlString, + URLNormalizers.SCOPE_GENERATE_HOST_COUNT); + } - // check if topN reached, select next segment if it is - while (segCounts[hostCount[0] - 1] >= limit - && hostCount[0] < maxNumSegments) { - hostCount[0]++; - hostCount[1] = 0; + } catch (Exception e) { + LOG.warn("Malformed URL: '" + urlString + "', skipping (" + + StringUtils.stringifyException(e) + ")"); + reporter.getCounter("Generator", "MALFORMED_URL").increment(1); + continue; } - // reached the limit of allowed URLs per host / domain - // see if we can put it in the next segment? - if (hostCount[1] >= maxCount) { - if (hostCount[0] < maxNumSegments) { - hostCount[0]++; - hostCount[1] = 0; - } else { - if (hostCount[1] == maxCount + 1 && LOG.isInfoEnabled()) { - LOG.info("Host or domain " - + hostordomain - + " has more than " - + maxCount - + " URLs for all " - + maxNumSegments - + " segments. Additional URLs won't be included in the fetchlist."); + // only filter if we are counting hosts or domains + if (maxCount > 0) { + // increment hostCount + hostDomainCount[1]++; + + // check if topN reached, select next segment if it is + while (segCounts[hostDomainCount[0] - 1] >= limit + && hostDomainCount[0] < maxNumSegments) { + hostDomainCount[0]++; + hostDomainCount[1] = 0; + } + + // reached the limit of allowed URLs per host / domain + // see if we can put it in the next segment? + if (hostDomainCount[1] >= maxCount) { + if (hostDomainCount[0] < maxNumSegments) { + hostDomainCount[0]++; + hostDomainCount[1] = 0; + } else { + if (hostDomainCount[1] == maxCount + 1 && LOG.isInfoEnabled()) { + LOG.info("Host or domain " + + hostorDomainName + + " has more than " + + maxCount + + " URLs for all " + + maxNumSegments + + " segments. Additional URLs won't be included in the fetchlist."); + } + // skip this entry + continue; } - // skip this entry - continue; } + entry.segnum = new IntWritable(hostDomainCount[0]); + segCounts[hostDomainCount[0] - 1]++; + } else { + entry.segnum = new IntWritable(currentsegmentnum); + segCounts[currentsegmentnum - 1]++; } - entry.segnum = new IntWritable(hostCount[0]); - segCounts[hostCount[0] - 1]++; - } else { - entry.segnum = new IntWritable(currentsegmentnum); - segCounts[currentsegmentnum - 1]++; - } - output.collect(key, entry); + output.collect(key.first, entry); - // Count is incremented only when we keep the URL - // maxCount may cause us to skip it. - count++; + // Count is incremented only when we keep the URL + // maxCount may cause us to skip it. + count++; + } } } } @@ -490,15 +665,6 @@ protected String generateFileNameForKeyValue(FloatWritable key, } - public static class DecreasingFloatComparator extends - FloatWritable.Comparator { - - /** Compares two FloatWritables decreasing. */ - public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { - return super.compare(b2, s2, l2, b1, s1, l1); - } - } - public static class SelectorInverseMapper extends MapReduceBase implements Mapper { @@ -632,7 +798,7 @@ public Path[] generate(Path dbDir, Path segments, int numLists, long topN, return generate(dbDir, segments, numLists, topN, curTime, filter, true, force, 1, null); } - + public Path[] generate(Path dbDir, Path segments, int numLists, long topN, long curTime, boolean filter, boolean norm, boolean force, int maxNumSegments, String expr) throws IOException { @@ -671,7 +837,7 @@ public Path[] generate(Path dbDir, Path segments, int numLists, long topN, FileSystem fs = tempDir.getFileSystem(getConf()); Path lock = CrawlDb.lock(getConf(), dbDir, force); - + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); LOG.info("Generator: starting at " + sdf.format(start)); @@ -687,7 +853,7 @@ public Path[] generate(Path dbDir, Path segments, int numLists, long topN, if (expr != null) { LOG.info("Generator: hostdb: {}", hostdb); } - + // map to inverted subset due for fetch, sort by score JobConf job = new NutchJob(getConf()); job.setJobName("generate: select from " + dbDir); @@ -713,18 +879,20 @@ public Path[] generate(Path dbDir, Path segments, int numLists, long topN, } if (hostdb != null) { job.set(GENERATOR_HOSTDB, hostdb); + MultipleInputs.addInputPath(job, new Path(hostdb, "current"), + SequenceFileInputFormat.class, HostDbReaderMapper.class); } - FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME)); - job.setInputFormat(SequenceFileInputFormat.class); + job.setMapOutputKeyClass(FloatTextPair.class); + MultipleInputs.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME), + SequenceFileInputFormat.class, Selector.class); - job.setMapperClass(Selector.class); + job.setOutputKeyComparatorClass(ScoreHostKeyComparator.class); job.setPartitionerClass(Selector.class); job.setReducerClass(Selector.class); FileOutputFormat.setOutputPath(job, tempDir); job.setOutputFormat(SequenceFileOutputFormat.class); job.setOutputKeyClass(FloatWritable.class); - job.setOutputKeyComparatorClass(DecreasingFloatComparator.class); job.setOutputValueClass(SelectorEntry.class); job.setOutputFormat(GeneratorOutputFormat.class); @@ -765,8 +933,8 @@ public Path[] generate(Path dbDir, Path segments, int numLists, long topN, if (getConf().getBoolean(GENERATE_UPDATE_CRAWLDB, false)) { // update the db from tempDir - Path tempDir2 = new Path(dbDir, - "generate-temp-" + java.util.UUID.randomUUID().toString()); + Path tempDir2 = new Path(dbDir, "generate-temp-" + + java.util.UUID.randomUUID().toString()); job = new NutchJob(getConf()); job.setJobName("generate: updatedb " + dbDir); @@ -777,6 +945,7 @@ public Path[] generate(Path dbDir, Path segments, int numLists, long topN, } FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME)); job.setInputFormat(SequenceFileInputFormat.class); + job.setMapperClass(CrawlDbUpdater.class); job.setReducerClass(CrawlDbUpdater.class); job.setOutputFormat(MapFileOutputFormat.class); @@ -824,7 +993,6 @@ private Path partitionSegment(Path segmentsDir, Path inputDir, int numLists) FileInputFormat.addInputPath(job, inputDir); job.setInputFormat(SequenceFileInputFormat.class); - job.setMapperClass(SelectorInverseMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(SelectorEntry.class); @@ -920,7 +1088,8 @@ public int run(String[] args) throws Exception { } @Override - public Map run(Map args, String crawlId) throws Exception { + public Map run(Map args, String crawlId) + throws Exception { Map results = new HashMap<>(); @@ -934,67 +1103,63 @@ public Map run(Map args, String crawlId) throws String expr = null; String hostdb = null; Path crawlDb; - - if(args.containsKey(Nutch.ARG_CRAWLDB)) { + + if (args.containsKey(Nutch.ARG_CRAWLDB)) { Object crawldbPath = args.get(Nutch.ARG_CRAWLDB); - if(crawldbPath instanceof Path) { + if (crawldbPath instanceof Path) { crawlDb = (Path) crawldbPath; - } - else { + } else { crawlDb = new Path(crawldbPath.toString()); } - } - else { - crawlDb = new Path(crawlId+"/crawldb"); + } else { + crawlDb = new Path(crawlId + "/crawldb"); } Path segmentsDir; - if(args.containsKey(Nutch.ARG_SEGMENTDIR)) { + if (args.containsKey(Nutch.ARG_SEGMENTDIR)) { Object segDir = args.get(Nutch.ARG_SEGMENTDIR); - if(segDir instanceof Path) { + if (segDir instanceof Path) { segmentsDir = (Path) segDir; - } - else { + } else { segmentsDir = new Path(segDir.toString()); } - } - else { - segmentsDir = new Path(crawlId+"/segments"); + } else { + segmentsDir = new Path(crawlId + "/segments"); } if (args.containsKey(Nutch.ARG_HOSTDB)) { - hostdb = (String)args.get(Nutch.ARG_HOSTDB); + hostdb = (String) args.get(Nutch.ARG_HOSTDB); } - + if (args.containsKey("expr")) { - expr = (String)args.get("expr"); + expr = (String) args.get("expr"); } if (args.containsKey("topN")) { - topN = Long.parseLong((String)args.get("topN")); + topN = Long.parseLong((String) args.get("topN")); } if (args.containsKey("numFetchers")) { - numFetchers = Integer.parseInt((String)args.get("numFetchers")); + numFetchers = Integer.parseInt((String) args.get("numFetchers")); } if (args.containsKey("adddays")) { - long numDays = Integer.parseInt((String)args.get("adddays")); + long numDays = Integer.parseInt((String) args.get("adddays")); curTime += numDays * 1000L * 60 * 60 * 24; } if (args.containsKey("noFilter")) { filter = false; - } + } if (args.containsKey("noNorm")) { norm = false; - } + } if (args.containsKey("force")) { force = true; - } + } if (args.containsKey("maxNumSegments")) { - maxNumSegments = Integer.parseInt((String)args.get("maxNumSegments")); + maxNumSegments = Integer.parseInt((String) args.get("maxNumSegments")); } try { Path[] segs = generate(crawlDb, segmentsDir, numFetchers, topN, curTime, filter, norm, force, maxNumSegments, expr, hostdb); - if (segs == null){ + if (segs == null) { results.put(Nutch.VAL_RESULT, Integer.toString(1)); return results; } diff --git a/src/java/org/apache/nutch/crawl/URLPartitioner.java b/src/java/org/apache/nutch/crawl/URLPartitioner.java index 48ea799985..c651d85bd9 100644 --- a/src/java/org/apache/nutch/crawl/URLPartitioner.java +++ b/src/java/org/apache/nutch/crawl/URLPartitioner.java @@ -18,25 +18,26 @@ package org.apache.nutch.crawl; import java.lang.invoke.MethodHandles; -import java.net.InetAddress; -import java.net.URL; import java.net.MalformedURLException; +import java.net.URL; import java.net.UnknownHostException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapred.*; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Partitioner; import org.apache.nutch.net.URLNormalizers; import org.apache.nutch.util.URLUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Partition urls by host, domain name or IP depending on the value of the * parameter 'partition.url.mode' which can be 'byHost', 'byDomain' or 'byIP' */ public class URLPartitioner implements Partitioner { - private static final Logger LOG = LoggerFactory - .getLogger(MethodHandles.lookup().lookupClass()); + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles + .lookup().lookupClass()); public static final String PARTITION_MODE_KEY = "partition.url.mode"; @@ -63,35 +64,38 @@ public void configure(JobConf job) { public void close() { } - /** Hash by host or domain name or IP address. */ - public int getPartition(Text key, Writable value, int numReduceTasks) { + public String getNormalizedURLRoot(Text key) { + URLNormalizers normalizers = this.normalizers; + String urlString = key.toString(); URL url = null; - int hashCode = 0; try { - urlString = normalizers.normalize(urlString, - URLNormalizers.SCOPE_PARTITION); + urlString = URLUtil.normalizeURL(urlString, normalizers); url = new URL(urlString); } catch (MalformedURLException e) { LOG.warn("Malformed URL: '" + urlString + "'"); + return urlString; + } + + try { + urlString = URLUtil.getUrlRootByMode(url, mode); + } catch (UnknownHostException e) { + Generator.LOG.info("Couldn't find IP for host: " + url.getHost()); } + return urlString; + } + + /** Hash by host or domain name or IP address. */ + public int getPartition(Text key, Writable value, int numReduceTasks) { + String partitionKey; - if (url == null) { - // failed to parse URL, must take URL string as fall-back - hashCode = urlString.hashCode(); - } else if (mode.equals(PARTITION_MODE_HOST)) { - hashCode = url.getHost().hashCode(); - } else if (mode.equals(PARTITION_MODE_DOMAIN)) { - hashCode = URLUtil.getDomainName(url).hashCode(); - } else if (mode.equals(PARTITION_MODE_IP)) { - try { - InetAddress address = InetAddress.getByName(url.getHost()); - hashCode = address.getHostAddress().hashCode(); - } catch (UnknownHostException e) { - Generator.LOG.info("Couldn't find IP for host: " + url.getHost()); - } + if (key.toString().isEmpty()) { + partitionKey = value.toString(); + } else { + partitionKey = getNormalizedURLRoot(key); } + int hashCode = partitionKey.hashCode(); // make hosts wind up in different partitions on different runs hashCode ^= seed; diff --git a/src/java/org/apache/nutch/util/URLUtil.java b/src/java/org/apache/nutch/util/URLUtil.java index 08f5236b28..c36381a2d8 100644 --- a/src/java/org/apache/nutch/util/URLUtil.java +++ b/src/java/org/apache/nutch/util/URLUtil.java @@ -17,10 +17,16 @@ package org.apache.nutch.util; +import java.net.IDN; +import java.net.InetAddress; import java.net.MalformedURLException; -import java.net.*; +import java.net.URI; +import java.net.URL; +import java.net.UnknownHostException; import java.util.regex.Pattern; +import org.apache.nutch.crawl.URLPartitioner; +import org.apache.nutch.net.URLNormalizers; import org.apache.nutch.util.domain.DomainSuffix; import org.apache.nutch.util.domain.DomainSuffixes; @@ -253,7 +259,8 @@ public static String[] getHostSegments(String url) *

* This method implements an extended version of the algorithm used by the * Yahoo! Slurp crawler described here:
- * How + * How * does the Yahoo! webcrawler handle redirects?
*
*

    @@ -404,13 +411,36 @@ public static String chooseRepr(String src, String dst, boolean temp) { * @param url * The url to check. * @return String The hostname for the url. + * @throws UnknownHostException */ public static String getHost(String url) { try { - return new URL(url).getHost().toLowerCase(); + return URLUtil.getUrlRootByMode(new URL(url), + URLPartitioner.PARTITION_MODE_HOST).toLowerCase(); } catch (MalformedURLException e) { return null; + } catch (UnknownHostException e) { + return null; + } + } + + public static String normalizeURL(String urlString, URLNormalizers normalizers) + throws MalformedURLException { + return normalizers.normalize(urlString, URLNormalizers.SCOPE_PARTITION); + } + + public static String getUrlRootByMode(URL url, String mode) + throws UnknownHostException { + String result = null; + if (mode.equals(URLPartitioner.PARTITION_MODE_HOST)) { + result = url.getHost(); + } else if (mode.equals(URLPartitioner.PARTITION_MODE_DOMAIN)) { + result = URLUtil.getDomainName(url); + } else if (mode.equals(URLPartitioner.PARTITION_MODE_IP)) { + InetAddress address = InetAddress.getByName(url.getHost()); + result = address.getHostAddress(); } + return result; } /**