diff --git a/src/java/org/apache/nutch/crawl/Generator.java b/src/java/org/apache/nutch/crawl/Generator.java index 102ce39b94..c7f3078a39 100644 --- a/src/java/org/apache/nutch/crawl/Generator.java +++ b/src/java/org/apache/nutch/crawl/Generator.java @@ -22,6 +22,7 @@ import java.lang.invoke.MethodHandles; import java.net.MalformedURLException; import java.net.URL; +import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; @@ -33,6 +34,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.time.StopWatch; +import org.apache.commons.lang3.tuple.MutablePair; import org.apache.hadoop.conf.Configurable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +47,7 @@ import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.StringUtils; @@ -57,6 +60,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; @@ -89,6 +93,44 @@ * selected for fetching. The URLs are partitioned by IP, domain or host within * a segment. We can choose separately how to count the URLs i.e. by domain or * host to limit the entries. + * + *

HostDb Integration (NUTCH-2455)

+ *

+ * When configured with a HostDb (via {@code -hostdb} option or + * {@code generate.hostdb} property), the Generator can apply per-host settings + * using JEXL expressions: + *

+ * + * + *

Performance Characteristics

+ *

+ * The HostDb integration uses secondary sorting via MapReduce to efficiently + * merge HostDb entries with CrawlDb entries. This approach has the following + * performance benefits compared to loading the entire HostDb into memory: + *

+ * + * + *

Backward Compatibility

+ *

+ * When {@code generate.hostdb} is not configured, the Generator operates without + * HostDb integration, using only the default {@code generate.max.count} setting. + * JEXL expressions ({@code generate.max.count.expr}, {@code generate.fetch.delay.expr}) + * are only evaluated when a HostDb is provided. + *

+ * + * @see org.apache.nutch.hostdb.UpdateHostDb + * @see org.apache.nutch.hostdb.HostDatum **/ public class Generator extends NutchTool implements Tool { @@ -116,15 +158,25 @@ public class Generator extends NutchTool implements Tool { public static final String GENERATOR_MAX_COUNT_EXPR = "generate.max.count.expr"; public static final String GENERATOR_FETCH_DELAY_EXPR = "generate.fetch.delay.expr"; + /** + * Selector entry holds URL, CrawlDatum, segment number, and optionally HostDatum. + * Used to carry data through the MapReduce pipeline. + * HostDatum is only serialized when hasHostDatum is true, to avoid + * serialization overhead when HostDb is not used. + */ public static class SelectorEntry implements Writable { public Text url; public CrawlDatum datum; public IntWritable segnum; + public BooleanWritable hasHostDatum; + public HostDatum hostdatum; public SelectorEntry() { url = new Text(); datum = new CrawlDatum(); segnum = new IntWritable(0); + hasHostDatum = new BooleanWritable(false); + hostdatum = new HostDatum(); } @Override @@ -132,6 +184,10 @@ public void readFields(DataInput in) throws IOException { url.readFields(in); datum.readFields(in); segnum.readFields(in); + hasHostDatum.readFields(in); + if (hasHostDatum.get()) { + hostdatum.readFields(in); + } } @Override @@ -139,6 +195,10 @@ public void write(DataOutput out) throws IOException { url.write(out); datum.write(out); segnum.write(out); + hasHostDatum.write(out); + if (hasHostDatum.get()) { + hostdatum.write(out); + } } @Override @@ -148,35 +208,245 @@ public String toString() { } } - /** Selects entries due for fetch. */ + /** + * Composite key for secondary sorting. Contains score and hostname. + * Used to ensure HostDb entries arrive before CrawlDb entries in the reducer. + * + * For HostDb entries: FloatTextPair(-Float.MAX_VALUE, hostname) + * For CrawlDb entries: FloatTextPair(score, "") + */ + public static class FloatTextPair implements WritableComparable { + public FloatWritable first; + public Text second; + + public FloatTextPair() { + this.first = new FloatWritable(); + this.second = new Text(); + } + + public FloatTextPair(FloatWritable first, Text second) { + this.first = first; + this.second = second; + } + + public FloatTextPair(float first, String second) { + this.first = new FloatWritable(first); + this.second = new Text(second); + } + + public FloatWritable getFirst() { + return first; + } + + public void setFirst(FloatWritable first) { + this.first = first; + } + + public Text getSecond() { + return second; + } + + public void setSecond(Text second) { + this.second = second; + } + + public void set(FloatWritable first, Text second) { + this.first = first; + this.second = second; + } + + @Override + public int hashCode() { + return first.hashCode() * 163 + second.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof FloatTextPair) { + FloatTextPair tp = (FloatTextPair) obj; + return first.equals(tp.getFirst()) && second.equals(tp.getSecond()); + } + return false; + } + + @Override + public String toString() { + return first + "\t" + second; + } + + @Override + public void readFields(DataInput in) throws IOException { + first.readFields(in); + second.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + first.write(out); + second.write(out); + } + + @Override + public int compareTo(FloatTextPair tp) { + int cmp = first.compareTo(tp.getFirst()); + if (cmp != 0) + return cmp; + return second.compareTo(tp.getSecond()); + } + } + + /** + * Comparator that ensures HostDb entries (with hostname in second field) + * sort before CrawlDb entries (with empty second field) for the same host. + * This enables the secondary sorting pattern for NUTCH-2455. + */ + public static class ScoreHostKeyComparator extends WritableComparator { + protected ScoreHostKeyComparator() { + super(FloatTextPair.class, true); + } + + @Override + public int compare(WritableComparable w1, WritableComparable w2) { + FloatTextPair key1 = (FloatTextPair) w1; + FloatTextPair key2 = (FloatTextPair) w2; + + boolean isKey1HostDatum = key1.second.getLength() > 0; + boolean isKey2HostDatum = key2.second.getLength() > 0; + + if (isKey1HostDatum && isKey2HostDatum) { + // Both are HostDb entries, sort by hostname + return key1.second.compareTo(key2.second); + } else { + if (isKey1HostDatum == isKey2HostDatum) { + // Both are CrawlDb entries, sort by score descending + return -1 * key1.first.compareTo(key2.first); + } else if (isKey1HostDatum) { + // HostDb entries come before CrawlDb entries + return -1; + } else { + return 1; + } + } + } + } + + /** + * Mapper that reads HostDb and emits entries for secondary sorting. + * Uses a very low score (-Float.MAX_VALUE) to ensure HostDb entries + * sort before CrawlDb entries. + */ + public static class HostDbReaderMapper + extends Mapper { + + @Override + public void map(Text hostname, HostDatum value, Context context) + throws IOException, InterruptedException { + SelectorEntry hostDataSelector = new SelectorEntry(); + hostDataSelector.hasHostDatum.set(true); + try { + hostDataSelector.hostdatum = (HostDatum) value.clone(); + } catch (CloneNotSupportedException e) { + hostDataSelector.hostdatum = value; + } + + // Use very low score and hostname to ensure HostDb entries + // sort before CrawlDb entries for the same host + context.write(new FloatTextPair(new FloatWritable(-Float.MAX_VALUE), + hostname), hostDataSelector); + } + } + + /** Selects entries due for fetch. Partitions by host/domain/IP (with HostDb support). */ + public static class SelectorWithHostDb extends Partitioner + implements Configurable { + + private final URLPartitioner partitioner = new URLPartitioner(); + private Configuration conf; + private int seed; + private String mode = URLPartitioner.PARTITION_MODE_HOST; + + /** + * Partition by host / domain or IP. + * For HostDb entries (key.second is non-empty), partition by the hostname. + * For CrawlDb entries (key.second is empty), use the URLPartitioner. + * + * Note: When partition.url.mode is not "byHost", HostDb entries may not + * be correctly grouped with their corresponding CrawlDb entries. For best + * results, use "byHost" mode (the default) when using HostDb. + */ + @Override + public int getPartition(FloatTextPair key, Writable value, + int numReduceTasks) { + SelectorEntry entry = (SelectorEntry) value; + // For HostDb entries, use the hostname from the key + // For CrawlDb entries, use the URL from the entry + if (key.second.getLength() > 0) { + // HostDb entry - partition by hostname + // This works best with "byHost" mode; other modes may cause + // HostDb entries to not be grouped with their CrawlDb entries + int hashCode = key.second.toString().hashCode(); + hashCode ^= seed; + return (hashCode & Integer.MAX_VALUE) % numReduceTasks; + } + return partitioner.getPartition(entry.url, key.first, numReduceTasks); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + this.seed = conf.getInt("partition.url.seed", 0); + this.mode = conf.get(URLPartitioner.PARTITION_MODE_KEY, + URLPartitioner.PARTITION_MODE_HOST); + partitioner.setConf(conf); + + // Warn if mode is not byHost and hostdb is configured + String hostdb = conf.get(GENERATOR_HOSTDB); + if (hostdb != null && !mode.equals(URLPartitioner.PARTITION_MODE_HOST)) { + LOG.warn("HostDb is configured but partition.url.mode is '{}'. " + + "For correct HostDb integration, use 'byHost' mode (the default). " + + "Other modes may cause HostDb entries to not be grouped with their CrawlDb entries.", + mode); + } + } + } + + /** + * Selects entries due for fetch. Partitions by host/domain/IP (original, without HostDb). + * Uses FloatWritable key for optimized raw byte comparison. + */ public static class Selector extends Partitioner implements Configurable { private final URLPartitioner partitioner = new URLPartitioner(); + private Configuration conf; - /** Partition by host / domain or IP. */ @Override public int getPartition(FloatWritable key, Writable value, int numReduceTasks) { - return partitioner.getPartition(((SelectorEntry) value).url, key, - numReduceTasks); + SelectorEntry entry = (SelectorEntry) value; + return partitioner.getPartition(entry.url, key, numReduceTasks); } @Override public Configuration getConf() { - return partitioner.getConf(); + return conf; } @Override public void setConf(Configuration conf) { + this.conf = conf; partitioner.setConf(conf); } } - /** Select and invert subset due for fetch. */ - - public static class SelectorMapper - extends Mapper { + /** Select and invert subset due for fetch (with HostDb secondary sorting). */ + public static class SelectorMapperWithHostDb + extends Mapper { private LongWritable genTime = new LongWritable(System.currentTimeMillis()); private long curTime; @@ -206,9 +476,7 @@ public static class SelectorMapper private Counter urlsSkippedPerHostOverflowCounter; @Override - public void setup( - Mapper.Context context) - throws IOException { + public void setup(Context context) throws IOException { conf = context.getConfiguration(); curTime = conf.getLong(GENERATOR_CUR_TIME, System.currentTimeMillis()); filters = new URLFilters(conf); @@ -329,21 +597,169 @@ public void map(Text key, CrawlDatum value, Context context) return; } - // sort by decreasing score, using DecreasingFloatComparator + // sort by decreasing score, using ScoreHostKeyComparator sortValue.set(sort); // record generation time crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime); entry.datum = crawlDatum; entry.url = key; - context.write(sortValue, entry); // invert for sort by score + + // CrawlDb entries have empty hostname (second field) + context.write(new FloatTextPair(sortValue, new Text()), entry); } } - /** Collect until limit is reached. */ - public static class SelectorReducer extends - Reducer { + /** + * Select and invert subset due for fetch (original, without HostDb). + * Uses FloatWritable key for optimized raw byte comparison during sorting. + */ + public static class SelectorMapper + extends Mapper { + + private LongWritable genTime = new LongWritable(System.currentTimeMillis()); + private long curTime; + private Configuration conf; + private URLFilters filters; + private ScoringFilters scfilters; + private SelectorEntry entry = new SelectorEntry(); + private FloatWritable sortValue = new FloatWritable(); + private boolean filter; + private long genDelay; + private FetchSchedule schedule; + private float scoreThreshold = 0f; + private int intervalThreshold = -1; + private byte restrictStatus = -1; + private JexlScript expr = null; + private ErrorTracker errorTracker; + + // Cached counter references for performance + private Counter urlFiltersRejectedCounter; + private Counter scheduleRejectedCounter; + private Counter waitForUpdateCounter; + private Counter exprRejectedCounter; + private Counter statusRejectedCounter; + private Counter scoreTooLowCounter; + private Counter intervalRejectedCounter; + + @Override + public void setup(Context context) throws IOException { + conf = context.getConfiguration(); + curTime = conf.getLong(GENERATOR_CUR_TIME, System.currentTimeMillis()); + filters = new URLFilters(conf); + scfilters = new ScoringFilters(conf); + filter = conf.getBoolean(GENERATOR_FILTER, true); + genDelay = conf.getLong(GENERATOR_DELAY, 604800000L); + long time = conf.getLong(Nutch.GENERATE_TIME_KEY, 0L); + if (time > 0) + genTime.set(time); + schedule = FetchScheduleFactory.getFetchSchedule(conf); + scoreThreshold = conf.getFloat(GENERATOR_MIN_SCORE, Float.NaN); + intervalThreshold = conf.getInt(GENERATOR_MIN_INTERVAL, -1); + String restrictStatusString = conf.getTrimmed(GENERATOR_RESTRICT_STATUS, + ""); + if (!restrictStatusString.isEmpty()) { + restrictStatus = CrawlDatum.getStatusByName(restrictStatusString); + } + expr = JexlUtil.parseExpression(conf.get(GENERATOR_EXPR, null)); + errorTracker = new ErrorTracker(NutchMetrics.GROUP_GENERATOR, context); + initCounters(context); + } + + private void initCounters(Context context) { + urlFiltersRejectedCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_URL_FILTERS_REJECTED_TOTAL); + scheduleRejectedCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_SCHEDULE_REJECTED_TOTAL); + waitForUpdateCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_WAIT_FOR_UPDATE_TOTAL); + exprRejectedCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_EXPR_REJECTED_TOTAL); + statusRejectedCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_STATUS_REJECTED_TOTAL); + scoreTooLowCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_SCORE_TOO_LOW_TOTAL); + intervalRejectedCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_INTERVAL_REJECTED_TOTAL); + } + + @Override + public void map(Text key, CrawlDatum value, Context context) + throws IOException, InterruptedException { + Text url = key; + if (filter) { + try { + if (filters.filter(url.toString()) == null) { + urlFiltersRejectedCounter.increment(1); + return; + } + } catch (URLFilterException e) { + errorTracker.incrementCounters(e); + LOG.warn("Couldn't filter url: {} ({})", url, e.getMessage()); + } + } + CrawlDatum crawlDatum = value; + + if (!schedule.shouldFetch(url, crawlDatum, curTime)) { + LOG.debug("-shouldFetch rejected '{}', fetchTime={}, curTime={}", url, + crawlDatum.getFetchTime(), curTime); + scheduleRejectedCounter.increment(1); + return; + } + + LongWritable oldGenTime = (LongWritable) crawlDatum.getMetaData() + .get(Nutch.WRITABLE_GENERATE_TIME_KEY); + if (oldGenTime != null) { + if (oldGenTime.get() + genDelay > curTime) { + waitForUpdateCounter.increment(1); + return; + } + } + float sort = 1.0f; + try { + sort = scfilters.generatorSortValue(key, crawlDatum, sort); + } catch (ScoringFilterException sfe) { + errorTracker.incrementCounters(sfe); + LOG.warn("Couldn't filter generatorSortValue for {}: {}", key, sfe); + } + + if (expr != null) { + if (!crawlDatum.execute(expr, key.toString())) { + exprRejectedCounter.increment(1); + return; + } + } + + if (restrictStatus != -1 && restrictStatus != crawlDatum.getStatus()) { + statusRejectedCounter.increment(1); + return; + } + + if (!Float.isNaN(scoreThreshold) && sort < scoreThreshold) { + scoreTooLowCounter.increment(1); + return; + } + + if (intervalThreshold != -1 + && crawlDatum.getFetchInterval() > intervalThreshold) { + intervalRejectedCounter.increment(1); + return; + } + + sortValue.set(sort); + crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime); + entry.datum = crawlDatum; + entry.url = key; + + // Emit FloatWritable directly for optimized sorting + context.write(sortValue, entry); + } + } - private HashMap hostCounts = new HashMap<>(); + /** Collect until limit is reached (with HostDb support). */ + public static class SelectorReducerWithHostDb extends + Reducer { + + private HashMap> hostDomainCounts = new HashMap<>(); private long count; private int currentsegmentnum = 1; private MultipleOutputs mos; @@ -356,40 +772,13 @@ public static class SelectorReducer extends private boolean byDomain = false; private URLNormalizers normalizers; private static boolean normalise; - private SequenceFile.Reader[] hostdbReaders = null; private JexlScript maxCountExpr = null; private JexlScript fetchDelayExpr = null; - private Map hostDatumCache = new HashMap<>(); private ErrorTracker errorTracker; - + // Cached counter references for performance private Counter hostsAffectedPerHostOverflowCounter; private Counter urlsSkippedPerHostOverflowCounter; - - public void readHostDb() throws IOException { - if (conf.get(GENERATOR_HOSTDB) == null) { - return; - } - - Path path = new Path(conf.get(GENERATOR_HOSTDB), "current"); - hostdbReaders = SegmentReaderUtil.getReaders(path, conf); - - try { - Text key = new Text(); - HostDatum value = new HostDatum(); - for (int i = 0; i < hostdbReaders.length; i++) { - while (hostdbReaders[i].next(key, value)) { - hostDatumCache.put(key.toString(), (HostDatum)value.clone()); - } - } - } catch (Exception e) { - throw new IOException(e); - } - - for (int i = 0; i < hostdbReaders.length; i++) { - hostdbReaders[i].close(); - } - } private JexlContext createContext(HostDatum datum) { JexlContext context = new MapContext(); @@ -458,12 +847,10 @@ public void setup(Context context) throws IOException { fetchDelayExpr = JexlUtil .parseExpression(conf.get(GENERATOR_FETCH_DELAY_EXPR, null)); } - // Initialize error tracker with cached counters + // Initialize error tracker errorTracker = new ErrorTracker(NutchMetrics.GROUP_GENERATOR, context); // Initialize cached counter references initReducerCounters(context); - - readHostDb(); } /** @@ -483,59 +870,105 @@ public void cleanup(Context context) } @Override - public void reduce(FloatWritable key, Iterable values, + public void reduce(FloatTextPair key, Iterable values, Context context) throws IOException, InterruptedException { - String currentHostname = null; - HostDatum host = null; LongWritable variableFetchDelayWritable = null; // in millis Text variableFetchDelayKey = new Text("_variableFetchDelay_"); // local variable maxCount may hold host-specific count set in HostDb int maxCount = this.maxCount; + int[] hostDomainCount = null; + HostDatum hostDatum = null; + for (SelectorEntry entry : values) { Text url = entry.url; String urlString = url.toString(); URL u = null; + String hostorDomainName = null; + + // Check if this is a HostDb entry (hostname in key.second) + if (key.second.getLength() > 0) { + // This is a HostDb entry - store it for later use + try { + hostDatum = (HostDatum) entry.hostdatum.clone(); + hostDomainCounts.put(key.second.toString(), + new MutablePair(hostDatum, new int[] { 1, 0 })); + } catch (Exception e) { + LOG.info("Exception while storing hostdb entry: {}", e.toString()); + } + // Don't process HostDb entries as URLs + continue; + } + + // Process normal CrawlDb entry + try { + u = new URL(urlString); + + if (byDomain) { + hostorDomainName = URLUtil.getUrlRootByMode(u, + URLPartitioner.PARTITION_MODE_DOMAIN).toLowerCase(); + } else { + hostorDomainName = URLUtil.getUrlRootByMode(u, + URLPartitioner.PARTITION_MODE_HOST).toLowerCase(); + } + + MutablePair hostDomainCountPair = hostDomainCounts + .get(hostorDomainName); - String hostname = URLUtil.getHost(urlString); - if (hostname == null) { - currentHostname = hostname; - // malformed URLs are counted later on when extracting host or domain - } else if (!hostname.equals(currentHostname)) { - currentHostname = hostname; - host = hostDatumCache.get(hostname); + if (hostDomainCountPair == null) { + hostDomainCount = new int[] { 1, 0 }; + hostDomainCountPair = new MutablePair(null, + hostDomainCount); + hostDomainCounts.put(hostorDomainName, hostDomainCountPair); + } else { + hostDomainCount = hostDomainCountPair.getRight(); + } - // Got it? - if (host != null) { + // Check hostdb expressions only for host, ignore domains + if (!byDomain) + hostDatum = hostDomainCountPair.getLeft(); + + if (hostDatum != null) { if (maxCountExpr != null) { try { - long variableMaxCount = Math - .round((double) maxCountExpr.execute(createContext(host))); + Object result = maxCountExpr.execute(createContext(hostDatum)); + long variableMaxCount = ((Number) result).longValue(); LOG.debug("Generator: variable maxCount: {} for {}", - variableMaxCount, hostname); + variableMaxCount, hostorDomainName); maxCount = (int) variableMaxCount; } catch (Exception e) { LOG.error( - "Unable to execute variable maxCount expression because: {}", - e.getMessage(), e); + "Unable to execute variable maxCount expression: {}", + e.getMessage()); } } if (fetchDelayExpr != null) { try { - long variableFetchDelay = Math.round( - (double) fetchDelayExpr.execute(createContext(host))); + Object result = fetchDelayExpr.execute(createContext(hostDatum)); + long variableFetchDelay = ((Number) result).longValue(); LOG.debug("Generator: variable fetchDelay: {} ms for {}", - variableFetchDelay, hostname); - variableFetchDelayWritable = new LongWritable( - variableFetchDelay); + variableFetchDelay, hostorDomainName); + variableFetchDelayWritable = new LongWritable(variableFetchDelay); } catch (Exception e) { LOG.error( - "Unable to execute fetch delay expression because: {}", - e.getMessage(), e); + "Unable to execute fetch delay expression: {}", + e.getMessage()); } } } + } catch (UnknownHostException e) { + LOG.warn("Unknown host for URL: {}", urlString); + continue; + } catch (MalformedURLException e) { + LOG.warn("Malformed URL: '{}', skipping ({})", urlString, + StringUtils.stringifyException(e)); + errorTracker.incrementCounters(e); + continue; + } + + if (maxCount == 0) { + continue; } // Got a non-zero variable fetch delay? Add it to the datum's metadata @@ -553,19 +986,11 @@ public void reduce(FloatWritable key, Iterable values, break; } - String hostordomain = null; - try { if (normalise && normalizers != null) { urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_GENERATE_HOST_COUNT); } - u = new URL(urlString); - if (byDomain) { - hostordomain = URLUtil.getDomainName(u); - } else { - hostordomain = u.getHost(); - } } catch (MalformedURLException e) { LOG.warn("Malformed URL: '{}', skipping ({})", urlString, StringUtils.stringifyException(e)); @@ -573,53 +998,45 @@ public void reduce(FloatWritable key, Iterable values, continue; } - hostordomain = hostordomain.toLowerCase(); - // only filter if we are counting hosts or domains if (maxCount > 0) { - int[] hostCount = hostCounts.get(hostordomain); - if (hostCount == null) { - hostCount = new int[] { 1, 0 }; - hostCounts.put(hostordomain, hostCount); - } - // increment hostCount - hostCount[1]++; + hostDomainCount[1]++; // check if topN reached, select next segment if it is - while (segCounts[hostCount[0] - 1] >= limit - && hostCount[0] < maxNumSegments) { - hostCount[0]++; - hostCount[1] = 0; + while (segCounts[hostDomainCount[0] - 1] >= limit + && hostDomainCount[0] < maxNumSegments) { + hostDomainCount[0]++; + hostDomainCount[1] = 0; } // reached the limit of allowed URLs per host / domain // see if we can put it in the next segment? - if (hostCount[1] > maxCount) { - if (hostCount[0] < maxNumSegments) { - hostCount[0]++; - hostCount[1] = 1; + if (hostDomainCount[1] > maxCount) { + if (hostDomainCount[0] < maxNumSegments) { + hostDomainCount[0]++; + hostDomainCount[1] = 1; } else { - if (hostCount[1] == (maxCount+1)) { + if (hostDomainCount[1] == (maxCount + 1)) { hostsAffectedPerHostOverflowCounter.increment(1); LOG.info( "Host or domain {} has more than {} URLs for all {} segments. Additional URLs won't be included in the fetchlist.", - hostordomain, maxCount, maxNumSegments); + hostorDomainName, maxCount, maxNumSegments); } // skip this entry urlsSkippedPerHostOverflowCounter.increment(1); continue; } } - entry.segnum = new IntWritable(hostCount[0]); - segCounts[hostCount[0] - 1]++; + entry.segnum = new IntWritable(hostDomainCount[0]); + segCounts[hostDomainCount[0] - 1]++; } else { entry.segnum = new IntWritable(currentsegmentnum); segCounts[currentsegmentnum - 1]++; } outputFile = generateFileName(entry); - mos.write("sequenceFiles", key, entry, outputFile); + mos.write("sequenceFiles", key.first, entry, outputFile); // Count is incremented only when we keep the URL // maxCount may cause us to skip it. @@ -632,6 +1049,173 @@ private String generateFileName(SelectorEntry entry) { } } + /** + * Collect until limit is reached (original, without HostDb support). + * Uses FloatWritable key for optimized raw byte comparison. + */ + public static class SelectorReducer extends + Reducer { + + private HashMap hostDomainCounts = new HashMap<>(); + private long count; + private int currentsegmentnum = 1; + private MultipleOutputs mos; + private String outputFile; + private long limit; + private int segCounts[]; + private int maxNumSegments = 1; + private int maxCount; + private Configuration conf; + private boolean byDomain = false; + private URLNormalizers normalizers; + private static boolean normalise; + private ErrorTracker errorTracker; + + // Cached counter references for performance + private Counter hostsAffectedPerHostOverflowCounter; + private Counter urlsSkippedPerHostOverflowCounter; + + @Override + public void setup(Context context) throws IOException { + conf = context.getConfiguration(); + mos = new MultipleOutputs(context); + Job job = Job.getInstance(conf, "Nutch Generator.SelectorReducer"); + limit = conf.getLong(GENERATOR_TOP_N, Long.MAX_VALUE) + / job.getNumReduceTasks(); + maxNumSegments = conf.getInt(GENERATOR_MAX_NUM_SEGMENTS, 1); + segCounts = new int[maxNumSegments]; + maxCount = conf.getInt(GENERATOR_MAX_COUNT, -1); + if (maxCount == -1) { + byDomain = false; + } + if (GENERATOR_COUNT_VALUE_DOMAIN.equals(conf.get(GENERATOR_COUNT_MODE))) + byDomain = true; + normalise = conf.getBoolean(GENERATOR_NORMALISE, true); + if (normalise) + normalizers = new URLNormalizers(conf, + URLNormalizers.SCOPE_GENERATE_HOST_COUNT); + errorTracker = new ErrorTracker(NutchMetrics.GROUP_GENERATOR, context); + initReducerCounters(context); + } + + private void initReducerCounters(Context context) { + hostsAffectedPerHostOverflowCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_HOSTS_AFFECTED_PER_HOST_OVERFLOW_TOTAL); + urlsSkippedPerHostOverflowCounter = context.getCounter( + NutchMetrics.GROUP_GENERATOR, NutchMetrics.GENERATOR_URLS_SKIPPED_PER_HOST_OVERFLOW_TOTAL); + } + + @Override + public void cleanup(Context context) + throws IOException, InterruptedException { + mos.close(); + } + + @Override + public void reduce(FloatWritable key, Iterable values, + Context context) throws IOException, InterruptedException { + + for (SelectorEntry entry : values) { + Text url = entry.url; + String urlString = url.toString(); + URL u = null; + String hostorDomainName = null; + int[] hostDomainCount = null; + + try { + u = new URL(urlString); + + if (byDomain) { + hostorDomainName = URLUtil.getUrlRootByMode(u, + URLPartitioner.PARTITION_MODE_DOMAIN).toLowerCase(); + } else { + hostorDomainName = URLUtil.getUrlRootByMode(u, + URLPartitioner.PARTITION_MODE_HOST).toLowerCase(); + } + + hostDomainCount = hostDomainCounts.get(hostorDomainName); + + if (hostDomainCount == null) { + hostDomainCount = new int[] { 1, 0 }; + hostDomainCounts.put(hostorDomainName, hostDomainCount); + } + } catch (UnknownHostException e) { + LOG.warn("Unknown host for URL: {}", urlString); + continue; + } catch (MalformedURLException e) { + LOG.warn("Malformed URL: '{}', skipping ({})", urlString, + StringUtils.stringifyException(e)); + errorTracker.incrementCounters(e); + continue; + } + + if (maxCount == 0) { + continue; + } + + if (count == limit) { + if (currentsegmentnum < maxNumSegments) { + count = 0; + currentsegmentnum++; + } else + break; + } + + try { + if (normalise && normalizers != null) { + urlString = normalizers.normalize(urlString, + URLNormalizers.SCOPE_GENERATE_HOST_COUNT); + } + } catch (MalformedURLException e) { + LOG.warn("Malformed URL: '{}', skipping ({})", urlString, + StringUtils.stringifyException(e)); + errorTracker.incrementCounters(e); + continue; + } + + if (maxCount > 0) { + hostDomainCount[1]++; + + while (segCounts[hostDomainCount[0] - 1] >= limit + && hostDomainCount[0] < maxNumSegments) { + hostDomainCount[0]++; + hostDomainCount[1] = 0; + } + + if (hostDomainCount[1] > maxCount) { + if (hostDomainCount[0] < maxNumSegments) { + hostDomainCount[0]++; + hostDomainCount[1] = 1; + } else { + if (hostDomainCount[1] == (maxCount + 1)) { + hostsAffectedPerHostOverflowCounter.increment(1); + LOG.info( + "Host or domain {} has more than {} URLs for all {} segments. Additional URLs won't be included in the fetchlist.", + hostorDomainName, maxCount, maxNumSegments); + } + urlsSkippedPerHostOverflowCounter.increment(1); + continue; + } + } + entry.segnum = new IntWritable(hostDomainCount[0]); + segCounts[hostDomainCount[0] - 1]++; + } else { + entry.segnum = new IntWritable(currentsegmentnum); + segCounts[currentsegmentnum - 1]++; + } + + outputFile = generateFileName(entry); + mos.write("sequenceFiles", key, entry, outputFile); + + count++; + } + } + + private String generateFileName(SelectorEntry entry) { + return "fetchlist-" + entry.segnum.toString() + "/part"; + } + } + public static class DecreasingFloatComparator extends FloatWritable.Comparator { @@ -984,20 +1568,43 @@ public Path[] generate(Path dbDir, Path segments, int numLists, long topN, if (expr != null) { conf.set(GENERATOR_EXPR, expr); } + + // Configure job based on whether HostDb is used if (hostdb != null) { + // HostDb path: Use secondary sorting with FloatTextPair key + LOG.info("Generator: using HostDb integration with secondary sorting"); conf.set(GENERATOR_HOSTDB, hostdb); + + // Use MultipleInputs to read from both HostDb and CrawlDb + MultipleInputs.addInputPath(job, new Path(hostdb, "current"), + SequenceFileInputFormat.class, HostDbReaderMapper.class); + MultipleInputs.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME), + SequenceFileInputFormat.class, SelectorMapperWithHostDb.class); + + job.setJarByClass(SelectorWithHostDb.class); + job.setMapOutputKeyClass(FloatTextPair.class); + job.setPartitionerClass(SelectorWithHostDb.class); + job.setReducerClass(SelectorReducerWithHostDb.class); + job.setSortComparatorClass(ScoreHostKeyComparator.class); + } else { + // Original path: Use FloatWritable key with optimized raw byte comparison + LOG.info("Generator: using standard generation (no HostDb)"); + + FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME)); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setMapperClass(SelectorMapper.class); + + job.setJarByClass(Selector.class); + job.setMapOutputKeyClass(FloatWritable.class); + job.setPartitionerClass(Selector.class); + job.setReducerClass(SelectorReducer.class); + job.setSortComparatorClass(DecreasingFloatComparator.class); } - FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME)); - job.setInputFormatClass(SequenceFileInputFormat.class); - - job.setJarByClass(Selector.class); - job.setMapperClass(SelectorMapper.class); - job.setPartitionerClass(Selector.class); - job.setReducerClass(SelectorReducer.class); - + + // Common configuration + job.setMapOutputValueClass(SelectorEntry.class); FileOutputFormat.setOutputPath(job, tempDir); job.setOutputKeyClass(FloatWritable.class); - job.setSortComparatorClass(DecreasingFloatComparator.class); job.setOutputValueClass(SelectorEntry.class); MultipleOutputs.addNamedOutput(job, "sequenceFiles", SequenceFileOutputFormat.class, FloatWritable.class, diff --git a/src/java/org/apache/nutch/crawl/URLPartitioner.java b/src/java/org/apache/nutch/crawl/URLPartitioner.java index cd1140a698..bdd294b5ef 100644 --- a/src/java/org/apache/nutch/crawl/URLPartitioner.java +++ b/src/java/org/apache/nutch/crawl/URLPartitioner.java @@ -71,6 +71,33 @@ public Configuration getConf() { return conf; } + /** + * Returns the normalized URL root (host, domain, or IP) for the given URL key. + * This is used for partitioning URLs by their root component. + * + * @param key + * The URL as a Text object. + * @return The normalized URL root based on the partition mode. + */ + public String getNormalizedURLRoot(Text key) { + String urlString = key.toString(); + URL url = null; + try { + urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_PARTITION); + url = new URL(urlString); + } catch (MalformedURLException e) { + LOG.warn("Malformed URL: '{}'", urlString); + return urlString; + } + + try { + urlString = URLUtil.getUrlRootByMode(url, mode); + } catch (UnknownHostException e) { + LOG.info("Couldn't find IP for host: {}", url.getHost()); + } + return urlString; + } + /** Hash by host or domain name or IP address. */ @Override public int getPartition(Text key, Writable value, int numReduceTasks) { diff --git a/src/java/org/apache/nutch/util/URLUtil.java b/src/java/org/apache/nutch/util/URLUtil.java index 44c6309d2a..13e0dfad7c 100644 --- a/src/java/org/apache/nutch/util/URLUtil.java +++ b/src/java/org/apache/nutch/util/URLUtil.java @@ -17,12 +17,16 @@ package org.apache.nutch.util; import java.net.IDN; +import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; +import java.net.UnknownHostException; import java.util.Locale; import java.util.regex.Pattern; +import org.apache.nutch.crawl.URLPartitioner; + import crawlercommons.domains.EffectiveTldFinder; /** Utility class for URL analysis */ @@ -488,6 +492,34 @@ public static String getHost(URL url) { return url.getHost().toLowerCase(Locale.ROOT); } + /** + * Returns the URL root (host, domain, or IP) based on the specified partition mode. + * This method centralizes the logic for extracting the appropriate URL component + * for partitioning purposes. + * + * @param url + * The URL to extract the root from. + * @param mode + * The partition mode: {@link URLPartitioner#PARTITION_MODE_HOST}, + * {@link URLPartitioner#PARTITION_MODE_DOMAIN}, or + * {@link URLPartitioner#PARTITION_MODE_IP}. + * @return The URL root based on the mode, or null if mode is unrecognized. + * @throws UnknownHostException + * if mode is byIP and the host cannot be resolved. + */ + public static String getUrlRootByMode(URL url, String mode) + throws UnknownHostException { + if (mode.equals(URLPartitioner.PARTITION_MODE_HOST)) { + return url.getHost(); + } else if (mode.equals(URLPartitioner.PARTITION_MODE_DOMAIN)) { + return URLUtil.getDomainName(url); + } else if (mode.equals(URLPartitioner.PARTITION_MODE_IP)) { + InetAddress address = InetAddress.getByName(url.getHost()); + return address.getHostAddress(); + } + return null; + } + /** * Returns the page for the url. The page consists of the protocol, host, and * path, but does not include the query string. The host is lowercased but the diff --git a/src/test/org/apache/nutch/crawl/TestGenerator.java b/src/test/org/apache/nutch/crawl/TestGenerator.java index cf4fa49558..a6da20c1c9 100644 --- a/src/test/org/apache/nutch/crawl/TestGenerator.java +++ b/src/test/org/apache/nutch/crawl/TestGenerator.java @@ -19,10 +19,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.Reader.Option; import org.apache.hadoop.io.Text; import org.apache.nutch.crawl.CrawlDBTestUtil.URLCrawlDatum; +import org.apache.nutch.hostdb.HostDatum; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -31,9 +34,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Basic generator test. 1. Insert entries in crawldb 2. Generates entries to @@ -49,6 +57,8 @@ public class TestGenerator { Path segmentsDir; + Path hostDbDir; + FileSystem fs; final static Path testdir = new Path("build/test/generator-test"); @@ -441,4 +451,216 @@ private URLCrawlDatum createURLCrawlDatum(final String url, return new CrawlDBTestUtil.URLCrawlDatum(new Text(url), new CrawlDatum( CrawlDatum.STATUS_DB_UNFETCHED, fetchInterval, score)); } + + /** + * Creates a HostDb with the given hostname -> HostDatum entries. + * Uses SequenceFile format as expected by Generator. + * + * @param hostData + * Map of hostname to HostDatum + * @throws IOException + */ + private void createHostDb(Map hostData) throws IOException { + hostDbDir = new Path(testdir, "hostdb"); + Path dir = new Path(hostDbDir, "current"); + fs.mkdirs(dir); + + // Use SequenceFile as expected by Generator's MultipleInputs + SequenceFile.Writer.Option fileOpt = SequenceFile.Writer.file(new Path(dir, "part-r-00000")); + SequenceFile.Writer.Option keyOpt = SequenceFile.Writer.keyClass(Text.class); + SequenceFile.Writer.Option valueOpt = SequenceFile.Writer.valueClass(HostDatum.class); + SequenceFile.Writer writer = SequenceFile.createWriter(conf, fileOpt, keyOpt, valueOpt); + + // Sort hostnames alphabetically for consistency + ArrayList sortedHosts = new ArrayList<>(hostData.keySet()); + Collections.sort(sortedHosts); + + for (String hostname : sortedHosts) { + writer.append(new Text(hostname), hostData.get(hostname)); + } + writer.close(); + } + + /** + * Generate Fetchlist with HostDb integration. + * + * @param numResults + * number of results to generate + * @param config + * Configuration to use + * @param filter + * whether to apply URL filters + * @param hostdb + * path to HostDb (can be null) + * @return path to generated segment + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException + */ + private Path generateFetchlistWithHostDb(int numResults, Configuration config, + boolean filter, Path hostdb) throws IOException, ClassNotFoundException, InterruptedException { + Generator g = new Generator(config); + String hostdbPath = (hostdb != null) ? hostdb.toString() : null; + Path[] generatedSegment = g.generate(dbDir, segmentsDir, -1, numResults, + Long.MAX_VALUE, filter, false, false, 1, null, hostdbPath); + if (generatedSegment == null) + return null; + return generatedSegment[0]; + } + + /** + * Test that Generator correctly integrates with HostDb using JEXL expressions. + * This test verifies: + * 1. HostDb entries are read and processed before CrawlDb entries + * 2. Variable fetch delays from JEXL expressions are applied + * 3. Variable max count from JEXL expressions are applied + * + * @throws Exception + */ + @Test + public void testGeneratorWithHostDb() throws Exception { + // Create CrawlDb with URLs from different hosts + ArrayList list = new ArrayList(); + list.add(createURLCrawlDatum("http://www.example.com/page1.html", 1, 1)); + list.add(createURLCrawlDatum("http://www.example.com/page2.html", 1, 1)); + list.add(createURLCrawlDatum("http://www.example.com/page3.html", 1, 1)); + list.add(createURLCrawlDatum("http://www.other.com/page1.html", 1, 1)); + + createCrawlDB(list); + + // Create HostDb with host-specific settings + Map hostData = new HashMap<>(); + + // Create HostDatum for www.example.com with high fetch count + HostDatum exampleDatum = new HostDatum(1.0f, new Date()); + exampleDatum.setFetched(100); // Set fetched count for JEXL expression + hostData.put("www.example.com", exampleDatum); + + // Create HostDatum for www.other.com with lower fetch count + HostDatum otherDatum = new HostDatum(0.5f, new Date()); + otherDatum.setFetched(10); + hostData.put("www.other.com", otherDatum); + + createHostDb(hostData); + + // Configure generator with JEXL expression for max count + // This expression limits hosts with <50 fetched pages to 1 URL per generate + Configuration myConfiguration = new Configuration(conf); + myConfiguration.setInt(Generator.GENERATOR_MAX_COUNT, 10); // Default max + myConfiguration.set(Generator.GENERATOR_MAX_COUNT_EXPR, "fetched < 50 ? 1 : 10"); + + Path generatedSegment = generateFetchlistWithHostDb(Integer.MAX_VALUE, + myConfiguration, false, hostDbDir); + + assertNotNull(generatedSegment, "Generated segment should not be null"); + + Path fetchlistPath = new Path(new Path(generatedSegment, + CrawlDatum.GENERATE_DIR_NAME), "part-r-00000"); + + ArrayList fetchList = readContents(fetchlistPath); + + // Count URLs per host + int exampleCount = 0; + int otherCount = 0; + for (URLCrawlDatum entry : fetchList) { + String url = entry.url.toString(); + if (url.contains("www.example.com")) { + exampleCount++; + } else if (url.contains("www.other.com")) { + otherCount++; + } + } + + // www.example.com has fetched=100, so max is 10 + // www.other.com has fetched=10, so max is 1 (from JEXL expression) + assertTrue(exampleCount <= 10, "example.com should have at most 10 URLs"); + assertEquals(1, otherCount, "other.com should have exactly 1 URL due to JEXL max count expression"); + } + + /** + * Test that Generator correctly applies variable fetch delay from HostDb. + * + * @throws Exception + */ + @Test + public void testGeneratorWithHostDbFetchDelay() throws Exception { + // Create CrawlDb with URLs (must be in alphabetical order for MapFile) + ArrayList list = new ArrayList(); + list.add(createURLCrawlDatum("http://www.delayhost.com/page1.html", 1, 1)); + + createCrawlDB(list); + + // Create HostDb with connection failure data + Map hostData = new HashMap<>(); + + // delayhost has many connection failures - should get longer delay + HostDatum delayDatum = new HostDatum(1.0f, new Date()); + delayDatum.setConnectionFailures(100L); + hostData.put("www.delayhost.com", delayDatum); + + createHostDb(hostData); + + // Configure generator with JEXL expression for fetch delay + // This simple expression just returns a constant based on connection failures + Configuration myConfiguration = new Configuration(conf); + myConfiguration.set(Generator.GENERATOR_FETCH_DELAY_EXPR, "5000"); + + Path generatedSegment = generateFetchlistWithHostDb(Integer.MAX_VALUE, + myConfiguration, false, hostDbDir); + + assertNotNull(generatedSegment, "Generated segment should not be null"); + + Path fetchlistPath = new Path(new Path(generatedSegment, + CrawlDatum.GENERATE_DIR_NAME), "part-r-00000"); + + ArrayList fetchList = readContents(fetchlistPath); + + assertEquals(1, fetchList.size(), "Should have exactly one URL in fetch list"); + + // Verify variable fetch delay is set in the CrawlDatum metadata + Text fetchDelayKey = new Text("_variableFetchDelay_"); + URLCrawlDatum entry = fetchList.get(0); + LongWritable fetchDelay = (LongWritable) entry.datum.getMetaData().get(fetchDelayKey); + + assertNotNull(fetchDelay, "URL should have variable fetch delay set when HostDb is configured"); + assertEquals(5000L, fetchDelay.get(), "Fetch delay should be 5000ms"); + } + + /** + * Test that Generator works correctly without HostDb (backward compatibility). + * + * @throws Exception + */ + @Test + public void testGeneratorWithoutHostDb() throws Exception { + // Create CrawlDb with URLs + ArrayList list = new ArrayList(); + list.add(createURLCrawlDatum("http://www.example.com/page1.html", 1, 1)); + list.add(createURLCrawlDatum("http://www.example.com/page2.html", 1, 1)); + list.add(createURLCrawlDatum("http://www.example.com/page3.html", 1, 1)); + + createCrawlDB(list); + + // Configure generator WITHOUT HostDb but WITH JEXL expressions + // The expressions should be ignored since there's no HostDb + Configuration myConfiguration = new Configuration(conf); + myConfiguration.setInt(Generator.GENERATOR_MAX_COUNT, 2); // Limit to 2 per host + myConfiguration.set(Generator.GENERATOR_MAX_COUNT_EXPR, "1"); // Would be 1 if HostDb present + + // Generate without HostDb + Path generatedSegment = generateFetchlistWithHostDb(Integer.MAX_VALUE, + myConfiguration, false, null); + + assertNotNull(generatedSegment, "Generated segment should not be null"); + + Path fetchlistPath = new Path(new Path(generatedSegment, + CrawlDatum.GENERATE_DIR_NAME), "part-r-00000"); + + ArrayList fetchList = readContents(fetchlistPath); + + // Without HostDb, the JEXL expression should be ignored and + // generate.max.count (2) should be used + assertEquals(2, fetchList.size(), + "Without HostDb, should use generate.max.count not JEXL expression"); + } } diff --git a/src/test/org/apache/nutch/crawl/TestGeneratorSecondarySort.java b/src/test/org/apache/nutch/crawl/TestGeneratorSecondarySort.java new file mode 100644 index 0000000000..ab0559ad11 --- /dev/null +++ b/src/test/org/apache/nutch/crawl/TestGeneratorSecondarySort.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.crawl; + +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.Text; +import org.apache.nutch.crawl.Generator.FloatTextPair; +import org.apache.nutch.crawl.Generator.ScoreHostKeyComparator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test cases for NUTCH-2455: Secondary sorting for efficient HostDb merging. + * Tests the FloatTextPair composite key and ScoreHostKeyComparator. + */ +public class TestGeneratorSecondarySort { + + private ScoreHostKeyComparator comparator; + + public TestGeneratorSecondarySort() { + // Default constructor + } + + @BeforeEach + public void setUp() { + comparator = new ScoreHostKeyComparator(); + } + + /** + * Test FloatTextPair basic functionality. + */ + @Test + public void testFloatTextPairBasics() { + FloatTextPair pair1 = new FloatTextPair(1.5f, "example.com"); + assertEquals(1.5f, pair1.getFirst().get(), 0.001f); + assertEquals("example.com", pair1.getSecond().toString()); + + FloatTextPair pair2 = new FloatTextPair(new FloatWritable(2.0f), new Text("test.org")); + assertEquals(2.0f, pair2.getFirst().get(), 0.001f); + assertEquals("test.org", pair2.getSecond().toString()); + + // Test default constructor + FloatTextPair pair3 = new FloatTextPair(); + pair3.setFirst(new FloatWritable(3.0f)); + pair3.setSecond(new Text("another.com")); + assertEquals(3.0f, pair3.getFirst().get(), 0.001f); + assertEquals("another.com", pair3.getSecond().toString()); + } + + /** + * Test FloatTextPair serialization/deserialization. + */ + @Test + public void testFloatTextPairSerialization() throws IOException { + FloatTextPair original = new FloatTextPair(1.5f, "example.com"); + + // Serialize + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos); + original.write(out); + out.close(); + + // Deserialize + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream in = new DataInputStream(bais); + FloatTextPair deserialized = new FloatTextPair(); + deserialized.readFields(in); + in.close(); + + assertEquals(original.getFirst().get(), deserialized.getFirst().get(), 0.001f); + assertEquals(original.getSecond().toString(), deserialized.getSecond().toString()); + } + + /** + * Test FloatTextPair equality and hashCode. + */ + @Test + public void testFloatTextPairEquality() { + FloatTextPair pair1 = new FloatTextPair(1.5f, "example.com"); + FloatTextPair pair2 = new FloatTextPair(1.5f, "example.com"); + FloatTextPair pair3 = new FloatTextPair(2.0f, "example.com"); + FloatTextPair pair4 = new FloatTextPair(1.5f, "other.com"); + + assertEquals(pair1, pair2); + assertEquals(pair1.hashCode(), pair2.hashCode()); + assertNotEquals(pair1, pair3); + assertNotEquals(pair1, pair4); + } + + /** + * Test that HostDb entries (with hostname) sort before CrawlDb entries (empty hostname). + * This is the core behavior needed for NUTCH-2455. + */ + @Test + public void testHostDbEntriesSortBeforeCrawlDbEntries() { + // HostDb entry: has hostname in second field + FloatTextPair hostDbEntry = new FloatTextPair(-Float.MAX_VALUE, "example.com"); + + // CrawlDb entry: empty hostname + FloatTextPair crawlDbEntry = new FloatTextPair(1.0f, ""); + + // HostDb should come before CrawlDb + assertTrue(comparator.compare(hostDbEntry, crawlDbEntry) < 0, + "HostDb entry should sort before CrawlDb entry"); + assertTrue(comparator.compare(crawlDbEntry, hostDbEntry) > 0, + "CrawlDb entry should sort after HostDb entry"); + } + + /** + * Test that multiple HostDb entries sort by hostname. + */ + @Test + public void testHostDbEntriesSortByHostname() { + FloatTextPair hostA = new FloatTextPair(-Float.MAX_VALUE, "aaa.com"); + FloatTextPair hostB = new FloatTextPair(-Float.MAX_VALUE, "bbb.com"); + FloatTextPair hostC = new FloatTextPair(-Float.MAX_VALUE, "ccc.com"); + + assertTrue(comparator.compare(hostA, hostB) < 0, + "aaa.com should sort before bbb.com"); + assertTrue(comparator.compare(hostB, hostC) < 0, + "bbb.com should sort before ccc.com"); + assertTrue(comparator.compare(hostA, hostC) < 0, + "aaa.com should sort before ccc.com"); + } + + /** + * Test that multiple CrawlDb entries sort by score (descending). + */ + @Test + public void testCrawlDbEntriesSortByScoreDescending() { + FloatTextPair highScore = new FloatTextPair(10.0f, ""); + FloatTextPair medScore = new FloatTextPair(5.0f, ""); + FloatTextPair lowScore = new FloatTextPair(1.0f, ""); + + // High score should come first (descending order) + assertTrue(comparator.compare(highScore, medScore) < 0, + "High score should sort before medium score"); + assertTrue(comparator.compare(medScore, lowScore) < 0, + "Medium score should sort before low score"); + assertTrue(comparator.compare(highScore, lowScore) < 0, + "High score should sort before low score"); + } + + /** + * Test complete sorting order with mixed HostDb and CrawlDb entries. + */ + @Test + public void testCompleteSortingOrder() { + List entries = new ArrayList<>(); + + // Add CrawlDb entries with various scores + entries.add(new FloatTextPair(5.0f, "")); // CrawlDb medium score + entries.add(new FloatTextPair(10.0f, "")); // CrawlDb high score + entries.add(new FloatTextPair(1.0f, "")); // CrawlDb low score + + // Add HostDb entries + entries.add(new FloatTextPair(-Float.MAX_VALUE, "bbb.com")); + entries.add(new FloatTextPair(-Float.MAX_VALUE, "aaa.com")); + + // Sort using our comparator + Collections.sort(entries, (a, b) -> comparator.compare(a, b)); + + // Verify order: HostDb entries first (sorted by hostname), then CrawlDb entries (by score desc) + assertEquals("aaa.com", entries.get(0).getSecond().toString(), + "First should be HostDb entry aaa.com"); + assertEquals("bbb.com", entries.get(1).getSecond().toString(), + "Second should be HostDb entry bbb.com"); + assertEquals(10.0f, entries.get(2).getFirst().get(), 0.001f, + "Third should be CrawlDb entry with score 10.0"); + assertEquals(5.0f, entries.get(3).getFirst().get(), 0.001f, + "Fourth should be CrawlDb entry with score 5.0"); + assertEquals(1.0f, entries.get(4).getFirst().get(), 0.001f, + "Fifth should be CrawlDb entry with score 1.0"); + } + + /** + * Test FloatTextPair compareTo method. + */ + @Test + public void testFloatTextPairCompareTo() { + FloatTextPair pair1 = new FloatTextPair(1.0f, "a"); + FloatTextPair pair2 = new FloatTextPair(1.0f, "b"); + FloatTextPair pair3 = new FloatTextPair(2.0f, "a"); + + // Different scores - compare by score first + assertTrue(pair1.compareTo(pair3) < 0); + assertTrue(pair3.compareTo(pair1) > 0); + + // Same score - compare by text + assertTrue(pair1.compareTo(pair2) < 0); + assertTrue(pair2.compareTo(pair1) > 0); + + // Equal + FloatTextPair pair4 = new FloatTextPair(1.0f, "a"); + assertEquals(0, pair1.compareTo(pair4)); + } + + /** + * Test toString method. + */ + @Test + public void testFloatTextPairToString() { + FloatTextPair pair = new FloatTextPair(1.5f, "example.com"); + String str = pair.toString(); + assertTrue(str.contains("1.5"), "toString should contain score"); + assertTrue(str.contains("example.com"), "toString should contain hostname"); + } +}