From 60eb705394fe93cfd120bedd77328d6fb404806f Mon Sep 17 00:00:00 2001 From: Semyon Semyonov Date: Wed, 29 Nov 2017 14:47:01 +0100 Subject: [PATCH 1/7] fix for NUTCH-2461 generate with maxcount equals 0 --- src/java/org/apache/nutch/crawl/Generator.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/java/org/apache/nutch/crawl/Generator.java b/src/java/org/apache/nutch/crawl/Generator.java index 5c098c58d8..6c74aab73c 100644 --- a/src/java/org/apache/nutch/crawl/Generator.java +++ b/src/java/org/apache/nutch/crawl/Generator.java @@ -377,7 +377,6 @@ public void reduce(FloatWritable key, Iterator values, LOG.info("Generator: variable maxCount: {} for {}", variableMaxCount, hostname); maxCount = (int)variableMaxCount; } - if (fetchDelayExpr != null) { long variableFetchDelay = Math.round((double)fetchDelayExpr.evaluate(createContext(host))); LOG.info("Generator: variable fetchDelay: {} ms for {}", variableFetchDelay, hostname); @@ -386,6 +385,10 @@ public void reduce(FloatWritable key, Iterator values, } } + if(maxCount == 0){ + continue; + } + // Got a non-zero variable fetch delay? Add it to the datum's metadata if (variableFetchDelayWritable != null) { entry.datum.getMetaData().put(variableFetchDelayKey, variableFetchDelayWritable); From 97083fa17234194ab2a35e9fbf7f11de42d87bef Mon Sep 17 00:00:00 2001 From: Semyon Semyonov Date: Wed, 17 Jan 2018 15:26:10 +0100 Subject: [PATCH 2/7] NUTCH-2481. HostDatum deltas expressions --- conf/nutch-default.xml | 9 + .../org/apache/nutch/hostdb/UpdateHostDb.java | 2 +- .../nutch/hostdb/UpdateHostDbMapper.java | 13 +- .../nutch/hostdb/UpdateHostDbReducer.java | 286 +++++++++++------- 4 files changed, 202 insertions(+), 108 deletions(-) diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml index f3f48a36ac..0208ef8c02 100644 --- a/conf/nutch-default.xml +++ b/conf/nutch-default.xml @@ -2588,6 +2588,15 @@ visit https://wiki.apache.org/nutch/SimilarityScoringFilter--> + + hostdb.deltaExpression + + + The expression for calculation of the delta statistics, the differences between of value of hostdb after update(currentHostDatum) and the value before(previousHostDatum). The return value in the KeyValuePair(String,Integer) format is written to the metadata of the hostdb. +For example, {return new ("javafx.util.Pair","FetchedDelta", currentHostDatum.fetched - previousHostDatum.fetched);} + + + diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDb.java b/src/java/org/apache/nutch/hostdb/UpdateHostDb.java index 6a3c4a9e8a..d422d02629 100644 --- a/src/java/org/apache/nutch/hostdb/UpdateHostDb.java +++ b/src/java/org/apache/nutch/hostdb/UpdateHostDb.java @@ -52,7 +52,6 @@ import org.apache.nutch.util.NutchConfiguration; import org.apache.nutch.util.NutchJob; import org.apache.nutch.util.TimingUtil; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +77,7 @@ public class UpdateHostDb extends Configured implements Tool { public static final String HOSTDB_NUMERIC_FIELDS = "hostdb.numeric.fields"; public static final String HOSTDB_STRING_FIELDS = "hostdb.string.fields"; public static final String HOSTDB_PERCENTILES = "hostdb.percentiles"; + public static final String HOSTDB_UPDATEDB_DELTA_EXPRESSION = "hostdb.deltaExpression"; private void updateHostDb(Path hostDb, Path crawlDb, Path topHosts, boolean checkFailed, boolean checkNew, boolean checkKnown, diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java b/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java index 69df4d17f0..3946204bd1 100644 --- a/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java +++ b/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java @@ -19,7 +19,7 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; - +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -27,7 +27,6 @@ import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; - import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.CrawlDb; import org.apache.nutch.crawl.NutchWritable; @@ -36,7 +35,6 @@ import org.apache.nutch.net.URLNormalizers; import org.apache.nutch.protocol.ProtocolStatus; import org.apache.nutch.util.URLUtil; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +60,7 @@ public class UpdateHostDbMapper protected URLFilters filters = null; protected URLNormalizers normalizers = null; + private boolean isDeltaStatisticCalculated = false; public void close() {} /** @@ -71,7 +70,8 @@ public void configure(JobConf job) { readingCrawlDb = job.getBoolean("hostdb.reading.crawldb", false); filter = job.getBoolean(UpdateHostDb.HOSTDB_URL_FILTERING, false); normalize = job.getBoolean(UpdateHostDb.HOSTDB_URL_NORMALIZING, false); - + isDeltaStatisticCalculated = !StringUtils.isEmpty(job.get(UpdateHostDb.HOSTDB_UPDATEDB_DELTA_EXPRESSION)); + if (filter) filters = new URLFilters(job); if (normalize) @@ -212,9 +212,10 @@ public void map(Text key, Writable value, // If we're also reading CrawlDb entries, reset db_* statistics because // we're aggregating them from CrawlDB anyway - if (readingCrawlDb) { + + if (readingCrawlDb && !isDeltaStatisticCalculated) { hostDatum.resetStatistics(); - } + } output.collect(key, new NutchWritable(hostDatum)); } diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java b/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java index 6ebe3d34f5..595458281a 100644 --- a/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java +++ b/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java @@ -19,14 +19,20 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Date; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.HashMap; -import java.util.Map; +import javafx.util.Pair; + +import org.apache.commons.jexl2.Expression; +import org.apache.commons.jexl2.JexlContext; +import org.apache.commons.jexl2.JexlEngine; +import org.apache.commons.jexl2.MapContext; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; @@ -36,24 +42,22 @@ import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.StringUtils; - import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.NutchWritable; - -import com.tdunning.math.stats.TDigest; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.tdunning.math.stats.TDigest; + /** * * */ -public class UpdateHostDbReducer - implements Reducer { +public class UpdateHostDbReducer implements + Reducer { - private static final Logger LOG = LoggerFactory - .getLogger(MethodHandles.lookup().lookupClass()); + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles + .lookup().lookupClass()); protected ResolverThread resolverThread = null; protected Integer numResolverThreads = 10; protected static Integer purgeFailedHostsThreshold = -1; @@ -68,18 +72,23 @@ public class UpdateHostDbReducer protected static int[] percentiles; protected static Text[] numericFieldWritables; protected static Text[] stringFieldWritables; - + protected BlockingQueue queue = new SynchronousQueue<>(); protected ThreadPoolExecutor executor = null; + protected String stringDeltaExpression = null; + protected Expression deltaExpression = null; + /** - * Configures the thread pool and prestarts all resolver threads. - * - * @param job - */ + * Configures the thread pool and prestarts all resolver threads. + * + * @param job + */ public void configure(JobConf job) { - purgeFailedHostsThreshold = job.getInt(UpdateHostDb.HOSTDB_PURGE_FAILED_HOSTS_THRESHOLD, -1); - numResolverThreads = job.getInt(UpdateHostDb.HOSTDB_NUM_RESOLVER_THREADS, 10); + purgeFailedHostsThreshold = job.getInt( + UpdateHostDb.HOSTDB_PURGE_FAILED_HOSTS_THRESHOLD, -1); + numResolverThreads = job.getInt(UpdateHostDb.HOSTDB_NUM_RESOLVER_THREADS, + 10); recheckInterval = job.getInt(UpdateHostDb.HOSTDB_RECHECK_INTERVAL, 86400) * 1000; checkFailed = job.getBoolean(UpdateHostDb.HOSTDB_CHECK_FAILED, false); checkNew = job.getBoolean(UpdateHostDb.HOSTDB_CHECK_NEW, false); @@ -88,7 +97,7 @@ public void configure(JobConf job) { numericFields = job.getStrings(UpdateHostDb.HOSTDB_NUMERIC_FIELDS); stringFields = job.getStrings(UpdateHostDb.HOSTDB_STRING_FIELDS); percentiles = job.getInts(UpdateHostDb.HOSTDB_PERCENTILES); - + // What fields do we need to collect metadata from if (numericFields != null) { numericFieldWritables = new Text[numericFields.length]; @@ -96,7 +105,7 @@ public void configure(JobConf job) { numericFieldWritables[i] = new Text(numericFields[i]); } } - + if (stringFields != null) { stringFieldWritables = new Text[stringFields.length]; for (int i = 0; i < stringFields.length; i++) { @@ -104,79 +113,112 @@ public void configure(JobConf job) { } } + stringDeltaExpression = job + .get(UpdateHostDb.HOSTDB_UPDATEDB_DELTA_EXPRESSION); + if (!org.apache.commons.lang3.StringUtils.isEmpty(stringDeltaExpression)) { + // Create or retrieve a JexlEngine + JexlEngine jexl = new JexlEngine(); + + // Dont't be silent and be strict + jexl.setSilent(true); + jexl.setStrict(true); + + // Create an expression object + this.deltaExpression = jexl.createExpression(stringDeltaExpression); + } + // Initialize the thread pool with our queue executor = new ThreadPoolExecutor(numResolverThreads, numResolverThreads, - 5, TimeUnit.SECONDS, queue); + 5, TimeUnit.SECONDS, queue); // Run all threads in the pool executor.prestartAllCoreThreads(); } + public Pair evaluateDeltaExpression( + HostDatum currentHostDatum, HostDatum previousHostDatum) { + Pair results = null; + + JexlContext jcontext = new MapContext(); + jcontext.set("currentHostDatum", currentHostDatum); + jcontext.set("previousHostDatum", previousHostDatum); + try { + results = (Pair) deltaExpression.evaluate(jcontext); + return results; + } catch (Exception e) { + LOG.info("Evaluate delta expression exception: " + e.toString()); + } + return results; + } + /** * */ public void reduce(Text key, Iterator values, - OutputCollector output, Reporter reporter) throws IOException { - - Map> stringCounts = new HashMap<>(); - Map maximums = new HashMap<>(); - Map sums = new HashMap<>(); // used to calc averages - Map counts = new HashMap<>(); // used to calc averages - Map minimums = new HashMap<>(); - Map tdigests = new HashMap(); - + OutputCollector output, Reporter reporter) + throws IOException { + + Map> stringCounts = new HashMap<>(); + Map maximums = new HashMap<>(); + Map sums = new HashMap<>(); // used to calc averages + Map counts = new HashMap<>(); // used to calc averages + Map minimums = new HashMap<>(); + Map tdigests = new HashMap(); + HostDatum hostDatum = new HostDatum(); + HostDatum histrocialHostDatum = null; float score = 0; - + if (stringFields != null) { for (int i = 0; i < stringFields.length; i++) { stringCounts.put(stringFields[i], new HashMap<>()); } } - + // Loop through all values until we find a non-empty HostDatum or use // an empty if this is a new host for the host db while (values.hasNext()) { Writable value = values.next().get(); - + // Count crawl datum status's and collect metadata from fields if (value instanceof CrawlDatum) { - CrawlDatum buffer = (CrawlDatum)value; - + CrawlDatum buffer = (CrawlDatum) value; + // Set the correct status field switch (buffer.getStatus()) { - case CrawlDatum.STATUS_DB_UNFETCHED: - hostDatum.setUnfetched(hostDatum.getUnfetched() + 1); - break; + case CrawlDatum.STATUS_DB_UNFETCHED: + hostDatum.setUnfetched(hostDatum.getUnfetched() + 1); + break; - case CrawlDatum.STATUS_DB_FETCHED: - hostDatum.setFetched(hostDatum.getFetched() + 1); - break; + case CrawlDatum.STATUS_DB_FETCHED: + hostDatum.setFetched(hostDatum.getFetched() + 1); + break; - case CrawlDatum.STATUS_DB_GONE: - hostDatum.setGone(hostDatum.getGone() + 1); - break; + case CrawlDatum.STATUS_DB_GONE: + hostDatum.setGone(hostDatum.getGone() + 1); + break; - case CrawlDatum.STATUS_DB_REDIR_TEMP: - hostDatum.setRedirTemp(hostDatum.getRedirTemp() + 1); - break; + case CrawlDatum.STATUS_DB_REDIR_TEMP: + hostDatum.setRedirTemp(hostDatum.getRedirTemp() + 1); + break; - case CrawlDatum.STATUS_DB_REDIR_PERM: - hostDatum.setRedirPerm(hostDatum.getRedirPerm() + 1); - break; + case CrawlDatum.STATUS_DB_REDIR_PERM: + hostDatum.setRedirPerm(hostDatum.getRedirPerm() + 1); + break; - case CrawlDatum.STATUS_DB_NOTMODIFIED: - hostDatum.setNotModified(hostDatum.getNotModified() + 1); - break; + case CrawlDatum.STATUS_DB_NOTMODIFIED: + hostDatum.setNotModified(hostDatum.getNotModified() + 1); + break; } - + // Record connection failures if (buffer.getRetriesSinceFetch() != 0) { hostDatum.incConnectionFailures(); } - + // Only gather metadata statistics for proper fetched pages - if (buffer.getStatus() == CrawlDatum.STATUS_DB_FETCHED || buffer.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) { + if (buffer.getStatus() == CrawlDatum.STATUS_DB_FETCHED + || buffer.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) { // Deal with the string fields if (stringFields != null) { for (int i = 0; i < stringFields.length; i++) { @@ -185,15 +227,19 @@ public void reduce(Text key, Iterator values, // Get it! String metadataValue = null; try { - metadataValue = buffer.getMetaData().get(stringFieldWritables[i]).toString(); + metadataValue = buffer.getMetaData() + .get(stringFieldWritables[i]).toString(); } catch (Exception e) { - LOG.error("Metadata field " + stringFields[i] + " is probably not a numeric value"); + LOG.error("Metadata field " + stringFields[i] + + " is probably not a numeric value"); } - + // Does the value exist? - if (stringCounts.get(stringFields[i]).containsKey(metadataValue)) { + if (stringCounts.get(stringFields[i]) + .containsKey(metadataValue)) { // Yes, increment it - stringCounts.get(stringFields[i]).put(metadataValue, stringCounts.get(stringFields[i]).get(metadataValue) + 1); + stringCounts.get(stringFields[i]).put(metadataValue, + stringCounts.get(stringFields[i]).get(metadataValue) + 1); } else { // Create it! stringCounts.get(stringFields[i]).put(metadataValue, 1); @@ -201,7 +247,7 @@ public void reduce(Text key, Iterator values, } } } - + // Deal with the numeric fields if (numericFields != null) { for (int i = 0; i < numericFields.length; i++) { @@ -209,18 +255,19 @@ public void reduce(Text key, Iterator values, if (buffer.getMetaData().get(numericFieldWritables[i]) != null) { try { // Get it! - Float metadataValue = Float.parseFloat(buffer.getMetaData().get(numericFieldWritables[i]).toString()); - + Float metadataValue = Float.parseFloat(buffer.getMetaData() + .get(numericFieldWritables[i]).toString()); + // Does the median value exist? if (tdigests.containsKey(numericFields[i])) { tdigests.get(numericFields[i]).add(metadataValue); } else { // Create it! TDigest tdigest = TDigest.createDigest(100); - tdigest.add((double)metadataValue); + tdigest.add((double) metadataValue); tdigests.put(numericFields[i], tdigest); } - + // Does the minimum value exist? if (minimums.containsKey(numericFields[i])) { // Write if this is lower than existing value @@ -231,7 +278,7 @@ public void reduce(Text key, Iterator values, // Create it! minimums.put(numericFields[i], metadataValue); } - + // Does the maximum value exist? if (maximums.containsKey(numericFields[i])) { // Write if this is lower than existing value @@ -242,29 +289,43 @@ public void reduce(Text key, Iterator values, // Create it! maximums.put(numericFields[i], metadataValue); } - + // Sum it up! if (sums.containsKey(numericFields[i])) { // Increment - sums.put(numericFields[i], sums.get(numericFields[i]) + metadataValue); - counts.put(numericFields[i], counts.get(numericFields[i]) + 1); + sums.put(numericFields[i], sums.get(numericFields[i]) + + metadataValue); + counts.put(numericFields[i], + counts.get(numericFields[i]) + 1); } else { // Create it! sums.put(numericFields[i], metadataValue); counts.put(numericFields[i], 1); } } catch (Exception e) { - LOG.error(e.getMessage() + " when processing values for " + key.toString()); + LOG.error(e.getMessage() + " when processing values for " + + key.toString()); } } } } } } - - // + + // if (value instanceof HostDatum) { - HostDatum buffer = (HostDatum)value; + HostDatum buffer = (HostDatum) value; + + if (!org.apache.commons.lang3.StringUtils + .isEmpty(stringDeltaExpression)) { + try { + histrocialHostDatum = (HostDatum) buffer.clone(); + } catch (CloneNotSupportedException e) { + histrocialHostDatum = null; + } finally { + buffer.resetStatistics(); + } + } // Check homepage URL if (buffer.hasHomepageUrl()) { @@ -285,7 +346,7 @@ public void reduce(Text key, Iterator values, if (buffer.getConnectionFailures() > 0) { hostDatum.setConnectionFailures(buffer.getConnectionFailures()); } - + // Check metadata if (!buffer.getMetaData().isEmpty()) { hostDatum.setMetaData(buffer.getMetaData()); @@ -299,7 +360,7 @@ public void reduce(Text key, Iterator values, // Check for the score if (value instanceof FloatWritable) { - FloatWritable buffer = (FloatWritable)value; + FloatWritable buffer = (FloatWritable) value; score = buffer.get(); } } @@ -308,35 +369,56 @@ public void reduce(Text key, Iterator values, if (score > 0) { hostDatum.setScore(score); } - + // Set metadata - for (Map.Entry> entry : stringCounts.entrySet()) { - for (Map.Entry subEntry : entry.getValue().entrySet()) { - hostDatum.getMetaData().put(new Text(entry.getKey() + "." + subEntry.getKey()), new IntWritable(subEntry.getValue())); + if (deltaExpression != null) { + Pair deltaExpressionResultPair = evaluateDeltaExpression( + hostDatum, histrocialHostDatum); + + if (deltaExpressionResultPair != null) { + hostDatum.getMetaData().put( + new Text(deltaExpressionResultPair.getKey()), + new IntWritable(deltaExpressionResultPair.getValue())); + } + } + + for (Map.Entry> entry : stringCounts + .entrySet()) { + for (Map.Entry subEntry : entry.getValue().entrySet()) { + hostDatum.getMetaData().put( + new Text(entry.getKey() + "." + subEntry.getKey()), + new IntWritable(subEntry.getValue())); } } for (Map.Entry entry : maximums.entrySet()) { - hostDatum.getMetaData().put(new Text("max." + entry.getKey()), new FloatWritable(entry.getValue())); + hostDatum.getMetaData().put(new Text("max." + entry.getKey()), + new FloatWritable(entry.getValue())); } for (Map.Entry entry : sums.entrySet()) { - hostDatum.getMetaData().put(new Text("avg." + entry.getKey()), new FloatWritable(entry.getValue() / counts.get(entry.getKey()))); + hostDatum.getMetaData().put(new Text("avg." + entry.getKey()), + new FloatWritable(entry.getValue() / counts.get(entry.getKey()))); } for (Map.Entry entry : tdigests.entrySet()) { // Emit all percentiles for (int i = 0; i < percentiles.length; i++) { - hostDatum.getMetaData().put(new Text("pct" + Integer.toString(percentiles[i]) + "." + entry.getKey()), new FloatWritable((float)entry.getValue().quantile(0.5))); + hostDatum.getMetaData().put( + new Text("pct" + Integer.toString(percentiles[i]) + "." + + entry.getKey()), + new FloatWritable((float) entry.getValue().quantile(0.5))); } - } + } for (Map.Entry entry : minimums.entrySet()) { - hostDatum.getMetaData().put(new Text("min." + entry.getKey()), new FloatWritable(entry.getValue())); + hostDatum.getMetaData().put(new Text("min." + entry.getKey()), + new FloatWritable(entry.getValue())); } - + reporter.incrCounter("UpdateHostDb", "total_hosts", 1); // See if this record is to be checked if (shouldCheck(hostDatum)) { // Make an entry - resolverThread = new ResolverThread(key.toString(), hostDatum, output, reporter, purgeFailedHostsThreshold); + resolverThread = new ResolverThread(key.toString(), hostDatum, output, + reporter, purgeFailedHostsThreshold); // Add the entry to the queue (blocking) try { @@ -357,11 +439,11 @@ public void reduce(Text key, Iterator values, } /** - * Determines whether a record should be checked. - * - * @param datum - * @return boolean - */ + * Determines whether a record should be checked. + * + * @param datum + * @return boolean + */ protected boolean shouldCheck(HostDatum datum) { // Whether a new record is to be checked if (checkNew && datum.isEmpty()) { @@ -383,15 +465,16 @@ protected boolean shouldCheck(HostDatum datum) { } /** - * Determines whether a record is eligible for recheck. - * - * @param datum - * @return boolean - */ + * Determines whether a record is eligible for recheck. + * + * @param datum + * @return boolean + */ protected boolean isEligibleForCheck(HostDatum datum) { // Whether an existing host, known or unknown, if forced to be rechecked - if (force || datum.getLastCheck().getTime() + - (recheckInterval * datum.getDnsFailures() + 1) > now) { + if (force + || datum.getLastCheck().getTime() + + (recheckInterval * datum.getDnsFailures() + 1) > now) { return true; } @@ -399,8 +482,8 @@ protected boolean isEligibleForCheck(HostDatum datum) { } /** - * Shut down all running threads and wait for completion. - */ + * Shut down all running threads and wait for completion. + */ public void close() { LOG.info("UpdateHostDb: feeder finished, waiting for shutdown"); @@ -414,7 +497,8 @@ public void close() { try { // Wait for the executor to shut down completely if (!executor.isTerminated()) { - LOG.info("UpdateHostDb: resolver threads waiting: " + Integer.toString(executor.getPoolSize())); + LOG.info("UpdateHostDb: resolver threads waiting: " + + Integer.toString(executor.getPoolSize())); Thread.sleep(1000); } else { // All is well, get out From b2612a484aa30e9ca22ad3c5762d2b449e5972da Mon Sep 17 00:00:00 2001 From: Semyon Semyonov Date: Wed, 17 Jan 2018 15:55:27 +0100 Subject: [PATCH 3/7] Revert "fix for NUTCH-2461 generate with maxcount equals 0" This reverts commit 60eb705394fe93cfd120bedd77328d6fb404806f. --- src/java/org/apache/nutch/crawl/Generator.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/java/org/apache/nutch/crawl/Generator.java b/src/java/org/apache/nutch/crawl/Generator.java index 6c74aab73c..5c098c58d8 100644 --- a/src/java/org/apache/nutch/crawl/Generator.java +++ b/src/java/org/apache/nutch/crawl/Generator.java @@ -377,6 +377,7 @@ public void reduce(FloatWritable key, Iterator values, LOG.info("Generator: variable maxCount: {} for {}", variableMaxCount, hostname); maxCount = (int)variableMaxCount; } + if (fetchDelayExpr != null) { long variableFetchDelay = Math.round((double)fetchDelayExpr.evaluate(createContext(host))); LOG.info("Generator: variable fetchDelay: {} ms for {}", variableFetchDelay, hostname); @@ -385,10 +386,6 @@ public void reduce(FloatWritable key, Iterator values, } } - if(maxCount == 0){ - continue; - } - // Got a non-zero variable fetch delay? Add it to the datum's metadata if (variableFetchDelayWritable != null) { entry.datum.getMetaData().put(variableFetchDelayKey, variableFetchDelayWritable); From 226f4b1f52f6673da16a95ff7b2624e1c008ae15 Mon Sep 17 00:00:00 2001 From: Semyon Semyonov Date: Fri, 19 Jan 2018 12:28:10 +0100 Subject: [PATCH 4/7] addition for NUTCH-2481 delta expression. Now expressions can return both (strng,number) and (string, string) pairs --- .../nutch/hostdb/UpdateHostDbReducer.java | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java b/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java index 595458281a..e9e13a245a 100644 --- a/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java +++ b/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java @@ -135,15 +135,15 @@ public void configure(JobConf job) { executor.prestartAllCoreThreads(); } - public Pair evaluateDeltaExpression( + public Pair evaluateDeltaExpression( HostDatum currentHostDatum, HostDatum previousHostDatum) { - Pair results = null; + Pair results = null; JexlContext jcontext = new MapContext(); jcontext.set("currentHostDatum", currentHostDatum); jcontext.set("previousHostDatum", previousHostDatum); try { - results = (Pair) deltaExpression.evaluate(jcontext); + results = (Pair) deltaExpression.evaluate(jcontext); return results; } catch (Exception e) { LOG.info("Evaluate delta expression exception: " + e.toString()); @@ -372,13 +372,27 @@ public void reduce(Text key, Iterator values, // Set metadata if (deltaExpression != null) { - Pair deltaExpressionResultPair = evaluateDeltaExpression( + Pair deltaExpressionResultPair = evaluateDeltaExpression( hostDatum, histrocialHostDatum); - if (deltaExpressionResultPair != null) { - hostDatum.getMetaData().put( - new Text(deltaExpressionResultPair.getKey()), - new IntWritable(deltaExpressionResultPair.getValue())); + try { + if (deltaExpressionResultPair != null) { + Object deltaExpressionResultPairValue = deltaExpressionResultPair + .getValue(); + if (deltaExpressionResultPairValue instanceof Number) { + hostDatum.getMetaData().put( + new Text(deltaExpressionResultPair.getKey()), + new FloatWritable(((Number) deltaExpressionResultPair + .getValue()).floatValue())); + } else { + hostDatum.getMetaData().put( + new Text(deltaExpressionResultPair.getKey()), + new Text(deltaExpressionResultPair.getValue().toString())); + } + } + } catch (Exception e) { + LOG.info("Exception of deltaExpression results convertion. The key should be a string, the value is either string or float : " + + e.getMessage()); } } From 555bfff26ac3dbff6a5d9ad7ca936367076bbcac Mon Sep 17 00:00:00 2001 From: Semyon Semyonov Date: Mon, 22 Jan 2018 09:37:03 +0100 Subject: [PATCH 5/7] NUTCH-2481. Changed the descprition of hostdb.deltaExpression in the config. Now the format of the return key can be either Number of String --- conf/nutch-default.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml index 0208ef8c02..35c7f276d6 100644 --- a/conf/nutch-default.xml +++ b/conf/nutch-default.xml @@ -2592,7 +2592,7 @@ visit https://wiki.apache.org/nutch/SimilarityScoringFilter--> hostdb.deltaExpression - The expression for calculation of the delta statistics, the differences between of value of hostdb after update(currentHostDatum) and the value before(previousHostDatum). The return value in the KeyValuePair(String,Integer) format is written to the metadata of the hostdb. + The expression for calculation of the delta statistics, the differences between of value of hostdb after update(currentHostDatum) and the value before(previousHostDatum). The return value in the KeyValuePair(String,Number) or KeyValuePair(String, String) format is written to the metadata of the hostdb. For example, {return new ("javafx.util.Pair","FetchedDelta", currentHostDatum.fetched - previousHostDatum.fetched);} From a898b5c8f8220f717745fbcaa4e24c3fb9ac974d Mon Sep 17 00:00:00 2001 From: Semyon Semyonov Date: Mon, 22 Jan 2018 09:44:17 +0100 Subject: [PATCH 6/7] Merge branch 'master', remote branch 'origin' From 785c354d08a916445e19a8b112d3709c67a09211 Mon Sep 17 00:00:00 2001 From: Semyon Semyonov Date: Mon, 12 Feb 2018 17:44:24 +0100 Subject: [PATCH 7/7] refactoring for usage of nutch JexlUtil --- .../org/apache/nutch/hostdb/ReadHostDb.java | 138 +++++++++--------- .../nutch/hostdb/UpdateHostDbReducer.java | 13 +- 2 files changed, 74 insertions(+), 77 deletions(-) diff --git a/src/java/org/apache/nutch/hostdb/ReadHostDb.java b/src/java/org/apache/nutch/hostdb/ReadHostDb.java index eac3bf6455..010327ebd6 100644 --- a/src/java/org/apache/nutch/hostdb/ReadHostDb.java +++ b/src/java/org/apache/nutch/hostdb/ReadHostDb.java @@ -21,46 +21,41 @@ import java.text.SimpleDateFormat; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.commons.jexl2.Expression; +import org.apache.commons.jexl2.JexlContext; +import org.apache.commons.jexl2.MapContext; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.nutch.util.NutchConfiguration; -import org.apache.nutch.util.StringUtil; import org.apache.nutch.util.TimingUtil; -import org.apache.nutch.util.URLUtil; - -import org.apache.commons.jexl2.JexlContext; -import org.apache.commons.jexl2.Expression; -import org.apache.commons.jexl2.JexlEngine; -import org.apache.commons.jexl2.MapContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * @see Commons + * @see Commons */ public class ReadHostDb extends Configured implements Tool { - private static final Logger LOG = LoggerFactory - .getLogger(MethodHandles.lookup().lookupClass()); + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles + .lookup().lookupClass()); public static final String HOSTDB_DUMP_HEADER = "hostdb.dump.field.header"; public static final String HOSTDB_DUMP_HOSTNAMES = "hostdb.dump.hostnames"; @@ -75,33 +70,33 @@ static class ReadHostDbMapper extends Mapper { protected Expression expr = null; public void setup(Context context) { - dumpHomepages = context.getConfiguration().getBoolean(HOSTDB_DUMP_HOMEPAGES, false); - dumpHostnames = context.getConfiguration().getBoolean(HOSTDB_DUMP_HOSTNAMES, false); - fieldHeader = context.getConfiguration().getBoolean(HOSTDB_DUMP_HEADER, true); + dumpHomepages = context.getConfiguration().getBoolean( + HOSTDB_DUMP_HOMEPAGES, false); + dumpHostnames = context.getConfiguration().getBoolean( + HOSTDB_DUMP_HOSTNAMES, false); + fieldHeader = context.getConfiguration().getBoolean(HOSTDB_DUMP_HEADER, + true); String expr = context.getConfiguration().get(HOSTDB_FILTER_EXPRESSION); if (expr != null) { - // Create or retrieve a JexlEngine - JexlEngine jexl = new JexlEngine(); - - // Dont't be silent and be strict - jexl.setSilent(true); - jexl.setStrict(true); - - // Create an expression object - this.expr = jexl.createExpression(expr); + this.expr = org.apache.nutch.util.JexlUtil.parseExpression(expr); } } - public void map(Text key, HostDatum datum, Context context) throws IOException, InterruptedException { + public void map(Text key, HostDatum datum, Context context) + throws IOException, InterruptedException { if (fieldHeader && !dumpHomepages && !dumpHostnames) { - context.write(new Text("hostname"), new Text("unfetched\tfetched\tgone\tredirTemp\tredirPerm\tredirSum\tok\tnumRecords\tdnsFail\tcnxFail\tsumFail\tscore\tlastCheck\thomepage\tmetadata")); + context + .write( + new Text("hostname"), + new Text( + "unfetched\tfetched\tgone\tredirTemp\tredirPerm\tredirSum\tok\tnumRecords\tdnsFail\tcnxFail\tsumFail\tscore\tlastCheck\thomepage\tmetadata")); fieldHeader = false; } - + if (expr != null) { // Create a context and add data JexlContext jcontext = new MapContext(); - + // Set some fixed variables jcontext.set("unfetched", datum.getUnfetched()); jcontext.set("fetched", datum.getFetched()); @@ -114,24 +109,25 @@ public void map(Text key, HostDatum datum, Context context) throws IOException, jcontext.set("numRecords", datum.numRecords()); jcontext.set("dnsFailures", datum.getDnsFailures()); jcontext.set("connectionFailures", datum.getConnectionFailures()); - + // Set metadata variables - for (Map.Entry entry : datum.getMetaData().entrySet()) { + for (Map.Entry entry : datum.getMetaData() + .entrySet()) { Object value = entry.getValue(); - + if (value instanceof FloatWritable) { - FloatWritable fvalue = (FloatWritable)value; - Text tkey = (Text)entry.getKey(); + FloatWritable fvalue = (FloatWritable) value; + Text tkey = (Text) entry.getKey(); jcontext.set(tkey.toString(), fvalue.get()); } - + if (value instanceof IntWritable) { - IntWritable ivalue = (IntWritable)value; - Text tkey = (Text)entry.getKey(); + IntWritable ivalue = (IntWritable) value; + Text tkey = (Text) entry.getKey(); jcontext.set(tkey.toString(), ivalue.get()); } } - + // Filter this record if evaluation did not pass try { if (!Boolean.TRUE.equals(expr.evaluate(jcontext))) { @@ -141,35 +137,38 @@ public void map(Text key, HostDatum datum, Context context) throws IOException, LOG.info(e.toString() + " for " + key.toString()); } } - + if (dumpHomepages) { if (datum.hasHomepageUrl()) { context.write(new Text(datum.getHomepageUrl()), emptyText); } return; } - + if (dumpHostnames) { context.write(key, emptyText); return; } - + // Write anyway context.write(key, new Text(datum.toString())); } } - // Todo, reduce unknown hosts to single unknown domain if possible. Enable via configuration + // Todo, reduce unknown hosts to single unknown domain if possible. Enable via + // configuration // host_a.example.org,host_a.example.org ==> example.org -// static class ReadHostDbReducer extends Reduce { -// public void setup(Context context) { } -// -// public void reduce(Text domain, Iterable hosts, Context context) throws IOException, InterruptedException { -// -// } -// } - - private void readHostDb(Path hostDb, Path output, boolean dumpHomepages, boolean dumpHostnames, String expr) throws Exception { + // static class ReadHostDbReducer extends Reduce { + // public void setup(Context context) { } + // + // public void reduce(Text domain, Iterable hosts, Context context) + // throws IOException, InterruptedException { + // + // } + // } + + private void readHostDb(Path hostDb, Path output, boolean dumpHomepages, + boolean dumpHostnames, String expr) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); LOG.info("ReadHostDb: starting at " + sdf.format(start)); @@ -182,7 +181,7 @@ private void readHostDb(Path hostDb, Path output, boolean dumpHomepages, boolean } conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); conf.set("mapred.textoutputformat.separator", "\t"); - + Job job = Job.getInstance(conf); job.setJobName("ReadHostDb"); job.setJarByClass(ReadHostDb.class); @@ -217,22 +216,24 @@ private void readHostDb(Path hostDb, Path output, boolean dumpHomepages, boolean } long end = System.currentTimeMillis(); - LOG.info("ReadHostDb: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end)); + LOG.info("ReadHostDb: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); } - + private void getHostDbRecord(Path hostDb, String host) throws Exception { Configuration conf = getConf(); - SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(conf, hostDb); + SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(conf, + hostDb); Class keyClass = readers[0].getKeyClass(); Class valueClass = readers[0].getValueClass(); - + if (!keyClass.getName().equals("org.apache.hadoop.io.Text")) throw new IOException("Incompatible key (" + keyClass.getName() + ")"); - + Text key = (Text) keyClass.newInstance(); HostDatum value = (HostDatum) valueClass.newInstance(); - + for (int i = 0; i < readers.length; i++) { while (readers[i].next(key, value)) { if (host.equals(key.toString())) { @@ -240,17 +241,19 @@ private void getHostDbRecord(Path hostDb, String host) throws Exception { } } readers[i].close(); - } + } } public static void main(String args[]) throws Exception { - int res = ToolRunner.run(NutchConfiguration.create(), new ReadHostDb(), args); + int res = ToolRunner.run(NutchConfiguration.create(), new ReadHostDb(), + args); System.exit(res); } public int run(String[] args) throws Exception { if (args.length < 2) { - System.err.println("Usage: ReadHostDb [-get ] [ [-dumpHomepages | -dumpHostnames | -expr ]]"); + System.err + .println("Usage: ReadHostDb [-get ] [ [-dumpHomepages | -dumpHostnames | -expr ]]"); return -1; } @@ -270,7 +273,7 @@ public int run(String[] args) throws Exception { } if (args[i].equals("-get")) { get = args[i + 1]; - LOG.info("ReadHostDb: get: "+ get); + LOG.info("ReadHostDb: get: " + get); i++; } if (args[i].equals("-expr")) { @@ -284,7 +287,8 @@ public int run(String[] args) throws Exception { if (get != null) { getHostDbRecord(new Path(args[0], "current"), get); } else { - readHostDb(new Path(args[0]), new Path(args[1]), dumpHomepages, dumpHostnames, expr); + readHostDb(new Path(args[0]), new Path(args[1]), dumpHomepages, + dumpHostnames, expr); } return 0; } catch (Exception e) { diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java b/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java index e9e13a245a..32de2efd8c 100644 --- a/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java +++ b/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java @@ -31,7 +31,6 @@ import org.apache.commons.jexl2.Expression; import org.apache.commons.jexl2.JexlContext; -import org.apache.commons.jexl2.JexlEngine; import org.apache.commons.jexl2.MapContext; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; @@ -115,16 +114,10 @@ public void configure(JobConf job) { stringDeltaExpression = job .get(UpdateHostDb.HOSTDB_UPDATEDB_DELTA_EXPRESSION); - if (!org.apache.commons.lang3.StringUtils.isEmpty(stringDeltaExpression)) { - // Create or retrieve a JexlEngine - JexlEngine jexl = new JexlEngine(); - - // Dont't be silent and be strict - jexl.setSilent(true); - jexl.setStrict(true); - // Create an expression object - this.deltaExpression = jexl.createExpression(stringDeltaExpression); + if (!org.apache.commons.lang3.StringUtils.isEmpty(stringDeltaExpression)) { + this.deltaExpression = org.apache.nutch.util.JexlUtil + .parseExpression(stringDeltaExpression); } // Initialize the thread pool with our queue