diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml index 87c405883e..709da16cd1 100644 --- a/conf/nutch-default.xml +++ b/conf/nutch-default.xml @@ -2596,6 +2596,15 @@ visit https://wiki.apache.org/nutch/SimilarityScoringFilter--> + + hostdb.deltaExpression + + + The expression for calculation of the delta statistics, the differences between of value of hostdb after update(currentHostDatum) and the value before(previousHostDatum). The return value in the KeyValuePair(String,Number) or KeyValuePair(String, String) format is written to the metadata of the hostdb. +For example, {return new ("javafx.util.Pair","FetchedDelta", currentHostDatum.fetched - previousHostDatum.fetched);} + + + diff --git a/src/java/org/apache/nutch/hostdb/ReadHostDb.java b/src/java/org/apache/nutch/hostdb/ReadHostDb.java index eac3bf6455..010327ebd6 100644 --- a/src/java/org/apache/nutch/hostdb/ReadHostDb.java +++ b/src/java/org/apache/nutch/hostdb/ReadHostDb.java @@ -21,46 +21,41 @@ import java.text.SimpleDateFormat; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.commons.jexl2.Expression; +import org.apache.commons.jexl2.JexlContext; +import org.apache.commons.jexl2.MapContext; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.nutch.util.NutchConfiguration; -import org.apache.nutch.util.StringUtil; import org.apache.nutch.util.TimingUtil; -import org.apache.nutch.util.URLUtil; - -import org.apache.commons.jexl2.JexlContext; -import org.apache.commons.jexl2.Expression; -import org.apache.commons.jexl2.JexlEngine; -import org.apache.commons.jexl2.MapContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * @see Commons + * @see Commons */ public class ReadHostDb extends Configured implements Tool { - private static final Logger LOG = LoggerFactory - .getLogger(MethodHandles.lookup().lookupClass()); + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles + .lookup().lookupClass()); public static final String HOSTDB_DUMP_HEADER = "hostdb.dump.field.header"; public static final String HOSTDB_DUMP_HOSTNAMES = "hostdb.dump.hostnames"; @@ -75,33 +70,33 @@ static class ReadHostDbMapper extends Mapper { protected Expression expr = null; public void setup(Context context) { - dumpHomepages = context.getConfiguration().getBoolean(HOSTDB_DUMP_HOMEPAGES, false); - dumpHostnames = context.getConfiguration().getBoolean(HOSTDB_DUMP_HOSTNAMES, false); - fieldHeader = context.getConfiguration().getBoolean(HOSTDB_DUMP_HEADER, true); + dumpHomepages = context.getConfiguration().getBoolean( + HOSTDB_DUMP_HOMEPAGES, false); + dumpHostnames = context.getConfiguration().getBoolean( + HOSTDB_DUMP_HOSTNAMES, false); + fieldHeader = context.getConfiguration().getBoolean(HOSTDB_DUMP_HEADER, + true); String expr = context.getConfiguration().get(HOSTDB_FILTER_EXPRESSION); if (expr != null) { - // Create or retrieve a JexlEngine - JexlEngine jexl = new JexlEngine(); - - // Dont't be silent and be strict - jexl.setSilent(true); - jexl.setStrict(true); - - // Create an expression object - this.expr = jexl.createExpression(expr); + this.expr = org.apache.nutch.util.JexlUtil.parseExpression(expr); } } - public void map(Text key, HostDatum datum, Context context) throws IOException, InterruptedException { + public void map(Text key, HostDatum datum, Context context) + throws IOException, InterruptedException { if (fieldHeader && !dumpHomepages && !dumpHostnames) { - context.write(new Text("hostname"), new Text("unfetched\tfetched\tgone\tredirTemp\tredirPerm\tredirSum\tok\tnumRecords\tdnsFail\tcnxFail\tsumFail\tscore\tlastCheck\thomepage\tmetadata")); + context + .write( + new Text("hostname"), + new Text( + "unfetched\tfetched\tgone\tredirTemp\tredirPerm\tredirSum\tok\tnumRecords\tdnsFail\tcnxFail\tsumFail\tscore\tlastCheck\thomepage\tmetadata")); fieldHeader = false; } - + if (expr != null) { // Create a context and add data JexlContext jcontext = new MapContext(); - + // Set some fixed variables jcontext.set("unfetched", datum.getUnfetched()); jcontext.set("fetched", datum.getFetched()); @@ -114,24 +109,25 @@ public void map(Text key, HostDatum datum, Context context) throws IOException, jcontext.set("numRecords", datum.numRecords()); jcontext.set("dnsFailures", datum.getDnsFailures()); jcontext.set("connectionFailures", datum.getConnectionFailures()); - + // Set metadata variables - for (Map.Entry entry : datum.getMetaData().entrySet()) { + for (Map.Entry entry : datum.getMetaData() + .entrySet()) { Object value = entry.getValue(); - + if (value instanceof FloatWritable) { - FloatWritable fvalue = (FloatWritable)value; - Text tkey = (Text)entry.getKey(); + FloatWritable fvalue = (FloatWritable) value; + Text tkey = (Text) entry.getKey(); jcontext.set(tkey.toString(), fvalue.get()); } - + if (value instanceof IntWritable) { - IntWritable ivalue = (IntWritable)value; - Text tkey = (Text)entry.getKey(); + IntWritable ivalue = (IntWritable) value; + Text tkey = (Text) entry.getKey(); jcontext.set(tkey.toString(), ivalue.get()); } } - + // Filter this record if evaluation did not pass try { if (!Boolean.TRUE.equals(expr.evaluate(jcontext))) { @@ -141,35 +137,38 @@ public void map(Text key, HostDatum datum, Context context) throws IOException, LOG.info(e.toString() + " for " + key.toString()); } } - + if (dumpHomepages) { if (datum.hasHomepageUrl()) { context.write(new Text(datum.getHomepageUrl()), emptyText); } return; } - + if (dumpHostnames) { context.write(key, emptyText); return; } - + // Write anyway context.write(key, new Text(datum.toString())); } } - // Todo, reduce unknown hosts to single unknown domain if possible. Enable via configuration + // Todo, reduce unknown hosts to single unknown domain if possible. Enable via + // configuration // host_a.example.org,host_a.example.org ==> example.org -// static class ReadHostDbReducer extends Reduce { -// public void setup(Context context) { } -// -// public void reduce(Text domain, Iterable hosts, Context context) throws IOException, InterruptedException { -// -// } -// } - - private void readHostDb(Path hostDb, Path output, boolean dumpHomepages, boolean dumpHostnames, String expr) throws Exception { + // static class ReadHostDbReducer extends Reduce { + // public void setup(Context context) { } + // + // public void reduce(Text domain, Iterable hosts, Context context) + // throws IOException, InterruptedException { + // + // } + // } + + private void readHostDb(Path hostDb, Path output, boolean dumpHomepages, + boolean dumpHostnames, String expr) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); LOG.info("ReadHostDb: starting at " + sdf.format(start)); @@ -182,7 +181,7 @@ private void readHostDb(Path hostDb, Path output, boolean dumpHomepages, boolean } conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); conf.set("mapred.textoutputformat.separator", "\t"); - + Job job = Job.getInstance(conf); job.setJobName("ReadHostDb"); job.setJarByClass(ReadHostDb.class); @@ -217,22 +216,24 @@ private void readHostDb(Path hostDb, Path output, boolean dumpHomepages, boolean } long end = System.currentTimeMillis(); - LOG.info("ReadHostDb: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end)); + LOG.info("ReadHostDb: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); } - + private void getHostDbRecord(Path hostDb, String host) throws Exception { Configuration conf = getConf(); - SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(conf, hostDb); + SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(conf, + hostDb); Class keyClass = readers[0].getKeyClass(); Class valueClass = readers[0].getValueClass(); - + if (!keyClass.getName().equals("org.apache.hadoop.io.Text")) throw new IOException("Incompatible key (" + keyClass.getName() + ")"); - + Text key = (Text) keyClass.newInstance(); HostDatum value = (HostDatum) valueClass.newInstance(); - + for (int i = 0; i < readers.length; i++) { while (readers[i].next(key, value)) { if (host.equals(key.toString())) { @@ -240,17 +241,19 @@ private void getHostDbRecord(Path hostDb, String host) throws Exception { } } readers[i].close(); - } + } } public static void main(String args[]) throws Exception { - int res = ToolRunner.run(NutchConfiguration.create(), new ReadHostDb(), args); + int res = ToolRunner.run(NutchConfiguration.create(), new ReadHostDb(), + args); System.exit(res); } public int run(String[] args) throws Exception { if (args.length < 2) { - System.err.println("Usage: ReadHostDb [-get ] [ [-dumpHomepages | -dumpHostnames | -expr ]]"); + System.err + .println("Usage: ReadHostDb [-get ] [ [-dumpHomepages | -dumpHostnames | -expr ]]"); return -1; } @@ -270,7 +273,7 @@ public int run(String[] args) throws Exception { } if (args[i].equals("-get")) { get = args[i + 1]; - LOG.info("ReadHostDb: get: "+ get); + LOG.info("ReadHostDb: get: " + get); i++; } if (args[i].equals("-expr")) { @@ -284,7 +287,8 @@ public int run(String[] args) throws Exception { if (get != null) { getHostDbRecord(new Path(args[0], "current"), get); } else { - readHostDb(new Path(args[0]), new Path(args[1]), dumpHomepages, dumpHostnames, expr); + readHostDb(new Path(args[0]), new Path(args[1]), dumpHomepages, + dumpHostnames, expr); } return 0; } catch (Exception e) { diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDb.java b/src/java/org/apache/nutch/hostdb/UpdateHostDb.java index 6a3c4a9e8a..d422d02629 100644 --- a/src/java/org/apache/nutch/hostdb/UpdateHostDb.java +++ b/src/java/org/apache/nutch/hostdb/UpdateHostDb.java @@ -52,7 +52,6 @@ import org.apache.nutch.util.NutchConfiguration; import org.apache.nutch.util.NutchJob; import org.apache.nutch.util.TimingUtil; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +77,7 @@ public class UpdateHostDb extends Configured implements Tool { public static final String HOSTDB_NUMERIC_FIELDS = "hostdb.numeric.fields"; public static final String HOSTDB_STRING_FIELDS = "hostdb.string.fields"; public static final String HOSTDB_PERCENTILES = "hostdb.percentiles"; + public static final String HOSTDB_UPDATEDB_DELTA_EXPRESSION = "hostdb.deltaExpression"; private void updateHostDb(Path hostDb, Path crawlDb, Path topHosts, boolean checkFailed, boolean checkNew, boolean checkKnown, diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java b/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java index 69df4d17f0..3946204bd1 100644 --- a/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java +++ b/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java @@ -19,7 +19,7 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; - +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -27,7 +27,6 @@ import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; - import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.CrawlDb; import org.apache.nutch.crawl.NutchWritable; @@ -36,7 +35,6 @@ import org.apache.nutch.net.URLNormalizers; import org.apache.nutch.protocol.ProtocolStatus; import org.apache.nutch.util.URLUtil; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +60,7 @@ public class UpdateHostDbMapper protected URLFilters filters = null; protected URLNormalizers normalizers = null; + private boolean isDeltaStatisticCalculated = false; public void close() {} /** @@ -71,7 +70,8 @@ public void configure(JobConf job) { readingCrawlDb = job.getBoolean("hostdb.reading.crawldb", false); filter = job.getBoolean(UpdateHostDb.HOSTDB_URL_FILTERING, false); normalize = job.getBoolean(UpdateHostDb.HOSTDB_URL_NORMALIZING, false); - + isDeltaStatisticCalculated = !StringUtils.isEmpty(job.get(UpdateHostDb.HOSTDB_UPDATEDB_DELTA_EXPRESSION)); + if (filter) filters = new URLFilters(job); if (normalize) @@ -212,9 +212,10 @@ public void map(Text key, Writable value, // If we're also reading CrawlDb entries, reset db_* statistics because // we're aggregating them from CrawlDB anyway - if (readingCrawlDb) { + + if (readingCrawlDb && !isDeltaStatisticCalculated) { hostDatum.resetStatistics(); - } + } output.collect(key, new NutchWritable(hostDatum)); } diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java b/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java index 6ebe3d34f5..32de2efd8c 100644 --- a/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java +++ b/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java @@ -19,14 +19,19 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Date; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.HashMap; -import java.util.Map; +import javafx.util.Pair; + +import org.apache.commons.jexl2.Expression; +import org.apache.commons.jexl2.JexlContext; +import org.apache.commons.jexl2.MapContext; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; @@ -36,24 +41,22 @@ import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.StringUtils; - import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.NutchWritable; - -import com.tdunning.math.stats.TDigest; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.tdunning.math.stats.TDigest; + /** * * */ -public class UpdateHostDbReducer - implements Reducer { +public class UpdateHostDbReducer implements + Reducer { - private static final Logger LOG = LoggerFactory - .getLogger(MethodHandles.lookup().lookupClass()); + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles + .lookup().lookupClass()); protected ResolverThread resolverThread = null; protected Integer numResolverThreads = 10; protected static Integer purgeFailedHostsThreshold = -1; @@ -68,18 +71,23 @@ public class UpdateHostDbReducer protected static int[] percentiles; protected static Text[] numericFieldWritables; protected static Text[] stringFieldWritables; - + protected BlockingQueue queue = new SynchronousQueue<>(); protected ThreadPoolExecutor executor = null; + protected String stringDeltaExpression = null; + protected Expression deltaExpression = null; + /** - * Configures the thread pool and prestarts all resolver threads. - * - * @param job - */ + * Configures the thread pool and prestarts all resolver threads. + * + * @param job + */ public void configure(JobConf job) { - purgeFailedHostsThreshold = job.getInt(UpdateHostDb.HOSTDB_PURGE_FAILED_HOSTS_THRESHOLD, -1); - numResolverThreads = job.getInt(UpdateHostDb.HOSTDB_NUM_RESOLVER_THREADS, 10); + purgeFailedHostsThreshold = job.getInt( + UpdateHostDb.HOSTDB_PURGE_FAILED_HOSTS_THRESHOLD, -1); + numResolverThreads = job.getInt(UpdateHostDb.HOSTDB_NUM_RESOLVER_THREADS, + 10); recheckInterval = job.getInt(UpdateHostDb.HOSTDB_RECHECK_INTERVAL, 86400) * 1000; checkFailed = job.getBoolean(UpdateHostDb.HOSTDB_CHECK_FAILED, false); checkNew = job.getBoolean(UpdateHostDb.HOSTDB_CHECK_NEW, false); @@ -88,7 +96,7 @@ public void configure(JobConf job) { numericFields = job.getStrings(UpdateHostDb.HOSTDB_NUMERIC_FIELDS); stringFields = job.getStrings(UpdateHostDb.HOSTDB_STRING_FIELDS); percentiles = job.getInts(UpdateHostDb.HOSTDB_PERCENTILES); - + // What fields do we need to collect metadata from if (numericFields != null) { numericFieldWritables = new Text[numericFields.length]; @@ -96,7 +104,7 @@ public void configure(JobConf job) { numericFieldWritables[i] = new Text(numericFields[i]); } } - + if (stringFields != null) { stringFieldWritables = new Text[stringFields.length]; for (int i = 0; i < stringFields.length; i++) { @@ -104,79 +112,106 @@ public void configure(JobConf job) { } } + stringDeltaExpression = job + .get(UpdateHostDb.HOSTDB_UPDATEDB_DELTA_EXPRESSION); + + if (!org.apache.commons.lang3.StringUtils.isEmpty(stringDeltaExpression)) { + this.deltaExpression = org.apache.nutch.util.JexlUtil + .parseExpression(stringDeltaExpression); + } + // Initialize the thread pool with our queue executor = new ThreadPoolExecutor(numResolverThreads, numResolverThreads, - 5, TimeUnit.SECONDS, queue); + 5, TimeUnit.SECONDS, queue); // Run all threads in the pool executor.prestartAllCoreThreads(); } + public Pair evaluateDeltaExpression( + HostDatum currentHostDatum, HostDatum previousHostDatum) { + Pair results = null; + + JexlContext jcontext = new MapContext(); + jcontext.set("currentHostDatum", currentHostDatum); + jcontext.set("previousHostDatum", previousHostDatum); + try { + results = (Pair) deltaExpression.evaluate(jcontext); + return results; + } catch (Exception e) { + LOG.info("Evaluate delta expression exception: " + e.toString()); + } + return results; + } + /** * */ public void reduce(Text key, Iterator values, - OutputCollector output, Reporter reporter) throws IOException { - - Map> stringCounts = new HashMap<>(); - Map maximums = new HashMap<>(); - Map sums = new HashMap<>(); // used to calc averages - Map counts = new HashMap<>(); // used to calc averages - Map minimums = new HashMap<>(); - Map tdigests = new HashMap(); - + OutputCollector output, Reporter reporter) + throws IOException { + + Map> stringCounts = new HashMap<>(); + Map maximums = new HashMap<>(); + Map sums = new HashMap<>(); // used to calc averages + Map counts = new HashMap<>(); // used to calc averages + Map minimums = new HashMap<>(); + Map tdigests = new HashMap(); + HostDatum hostDatum = new HostDatum(); + HostDatum histrocialHostDatum = null; float score = 0; - + if (stringFields != null) { for (int i = 0; i < stringFields.length; i++) { stringCounts.put(stringFields[i], new HashMap<>()); } } - + // Loop through all values until we find a non-empty HostDatum or use // an empty if this is a new host for the host db while (values.hasNext()) { Writable value = values.next().get(); - + // Count crawl datum status's and collect metadata from fields if (value instanceof CrawlDatum) { - CrawlDatum buffer = (CrawlDatum)value; - + CrawlDatum buffer = (CrawlDatum) value; + // Set the correct status field switch (buffer.getStatus()) { - case CrawlDatum.STATUS_DB_UNFETCHED: - hostDatum.setUnfetched(hostDatum.getUnfetched() + 1); - break; + case CrawlDatum.STATUS_DB_UNFETCHED: + hostDatum.setUnfetched(hostDatum.getUnfetched() + 1); + break; - case CrawlDatum.STATUS_DB_FETCHED: - hostDatum.setFetched(hostDatum.getFetched() + 1); - break; + case CrawlDatum.STATUS_DB_FETCHED: + hostDatum.setFetched(hostDatum.getFetched() + 1); + break; - case CrawlDatum.STATUS_DB_GONE: - hostDatum.setGone(hostDatum.getGone() + 1); - break; + case CrawlDatum.STATUS_DB_GONE: + hostDatum.setGone(hostDatum.getGone() + 1); + break; - case CrawlDatum.STATUS_DB_REDIR_TEMP: - hostDatum.setRedirTemp(hostDatum.getRedirTemp() + 1); - break; + case CrawlDatum.STATUS_DB_REDIR_TEMP: + hostDatum.setRedirTemp(hostDatum.getRedirTemp() + 1); + break; - case CrawlDatum.STATUS_DB_REDIR_PERM: - hostDatum.setRedirPerm(hostDatum.getRedirPerm() + 1); - break; + case CrawlDatum.STATUS_DB_REDIR_PERM: + hostDatum.setRedirPerm(hostDatum.getRedirPerm() + 1); + break; - case CrawlDatum.STATUS_DB_NOTMODIFIED: - hostDatum.setNotModified(hostDatum.getNotModified() + 1); - break; + case CrawlDatum.STATUS_DB_NOTMODIFIED: + hostDatum.setNotModified(hostDatum.getNotModified() + 1); + break; } - + // Record connection failures if (buffer.getRetriesSinceFetch() != 0) { hostDatum.incConnectionFailures(); } - + // Only gather metadata statistics for proper fetched pages - if (buffer.getStatus() == CrawlDatum.STATUS_DB_FETCHED || buffer.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) { + if (buffer.getStatus() == CrawlDatum.STATUS_DB_FETCHED + || buffer.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) { // Deal with the string fields if (stringFields != null) { for (int i = 0; i < stringFields.length; i++) { @@ -185,15 +220,19 @@ public void reduce(Text key, Iterator values, // Get it! String metadataValue = null; try { - metadataValue = buffer.getMetaData().get(stringFieldWritables[i]).toString(); + metadataValue = buffer.getMetaData() + .get(stringFieldWritables[i]).toString(); } catch (Exception e) { - LOG.error("Metadata field " + stringFields[i] + " is probably not a numeric value"); + LOG.error("Metadata field " + stringFields[i] + + " is probably not a numeric value"); } - + // Does the value exist? - if (stringCounts.get(stringFields[i]).containsKey(metadataValue)) { + if (stringCounts.get(stringFields[i]) + .containsKey(metadataValue)) { // Yes, increment it - stringCounts.get(stringFields[i]).put(metadataValue, stringCounts.get(stringFields[i]).get(metadataValue) + 1); + stringCounts.get(stringFields[i]).put(metadataValue, + stringCounts.get(stringFields[i]).get(metadataValue) + 1); } else { // Create it! stringCounts.get(stringFields[i]).put(metadataValue, 1); @@ -201,7 +240,7 @@ public void reduce(Text key, Iterator values, } } } - + // Deal with the numeric fields if (numericFields != null) { for (int i = 0; i < numericFields.length; i++) { @@ -209,18 +248,19 @@ public void reduce(Text key, Iterator values, if (buffer.getMetaData().get(numericFieldWritables[i]) != null) { try { // Get it! - Float metadataValue = Float.parseFloat(buffer.getMetaData().get(numericFieldWritables[i]).toString()); - + Float metadataValue = Float.parseFloat(buffer.getMetaData() + .get(numericFieldWritables[i]).toString()); + // Does the median value exist? if (tdigests.containsKey(numericFields[i])) { tdigests.get(numericFields[i]).add(metadataValue); } else { // Create it! TDigest tdigest = TDigest.createDigest(100); - tdigest.add((double)metadataValue); + tdigest.add((double) metadataValue); tdigests.put(numericFields[i], tdigest); } - + // Does the minimum value exist? if (minimums.containsKey(numericFields[i])) { // Write if this is lower than existing value @@ -231,7 +271,7 @@ public void reduce(Text key, Iterator values, // Create it! minimums.put(numericFields[i], metadataValue); } - + // Does the maximum value exist? if (maximums.containsKey(numericFields[i])) { // Write if this is lower than existing value @@ -242,29 +282,43 @@ public void reduce(Text key, Iterator values, // Create it! maximums.put(numericFields[i], metadataValue); } - + // Sum it up! if (sums.containsKey(numericFields[i])) { // Increment - sums.put(numericFields[i], sums.get(numericFields[i]) + metadataValue); - counts.put(numericFields[i], counts.get(numericFields[i]) + 1); + sums.put(numericFields[i], sums.get(numericFields[i]) + + metadataValue); + counts.put(numericFields[i], + counts.get(numericFields[i]) + 1); } else { // Create it! sums.put(numericFields[i], metadataValue); counts.put(numericFields[i], 1); } } catch (Exception e) { - LOG.error(e.getMessage() + " when processing values for " + key.toString()); + LOG.error(e.getMessage() + " when processing values for " + + key.toString()); } } } } } } - - // + + // if (value instanceof HostDatum) { - HostDatum buffer = (HostDatum)value; + HostDatum buffer = (HostDatum) value; + + if (!org.apache.commons.lang3.StringUtils + .isEmpty(stringDeltaExpression)) { + try { + histrocialHostDatum = (HostDatum) buffer.clone(); + } catch (CloneNotSupportedException e) { + histrocialHostDatum = null; + } finally { + buffer.resetStatistics(); + } + } // Check homepage URL if (buffer.hasHomepageUrl()) { @@ -285,7 +339,7 @@ public void reduce(Text key, Iterator values, if (buffer.getConnectionFailures() > 0) { hostDatum.setConnectionFailures(buffer.getConnectionFailures()); } - + // Check metadata if (!buffer.getMetaData().isEmpty()) { hostDatum.setMetaData(buffer.getMetaData()); @@ -299,7 +353,7 @@ public void reduce(Text key, Iterator values, // Check for the score if (value instanceof FloatWritable) { - FloatWritable buffer = (FloatWritable)value; + FloatWritable buffer = (FloatWritable) value; score = buffer.get(); } } @@ -308,35 +362,70 @@ public void reduce(Text key, Iterator values, if (score > 0) { hostDatum.setScore(score); } - + // Set metadata - for (Map.Entry> entry : stringCounts.entrySet()) { - for (Map.Entry subEntry : entry.getValue().entrySet()) { - hostDatum.getMetaData().put(new Text(entry.getKey() + "." + subEntry.getKey()), new IntWritable(subEntry.getValue())); + if (deltaExpression != null) { + Pair deltaExpressionResultPair = evaluateDeltaExpression( + hostDatum, histrocialHostDatum); + + try { + if (deltaExpressionResultPair != null) { + Object deltaExpressionResultPairValue = deltaExpressionResultPair + .getValue(); + if (deltaExpressionResultPairValue instanceof Number) { + hostDatum.getMetaData().put( + new Text(deltaExpressionResultPair.getKey()), + new FloatWritable(((Number) deltaExpressionResultPair + .getValue()).floatValue())); + } else { + hostDatum.getMetaData().put( + new Text(deltaExpressionResultPair.getKey()), + new Text(deltaExpressionResultPair.getValue().toString())); + } + } + } catch (Exception e) { + LOG.info("Exception of deltaExpression results convertion. The key should be a string, the value is either string or float : " + + e.getMessage()); + } + } + + for (Map.Entry> entry : stringCounts + .entrySet()) { + for (Map.Entry subEntry : entry.getValue().entrySet()) { + hostDatum.getMetaData().put( + new Text(entry.getKey() + "." + subEntry.getKey()), + new IntWritable(subEntry.getValue())); } } for (Map.Entry entry : maximums.entrySet()) { - hostDatum.getMetaData().put(new Text("max." + entry.getKey()), new FloatWritable(entry.getValue())); + hostDatum.getMetaData().put(new Text("max." + entry.getKey()), + new FloatWritable(entry.getValue())); } for (Map.Entry entry : sums.entrySet()) { - hostDatum.getMetaData().put(new Text("avg." + entry.getKey()), new FloatWritable(entry.getValue() / counts.get(entry.getKey()))); + hostDatum.getMetaData().put(new Text("avg." + entry.getKey()), + new FloatWritable(entry.getValue() / counts.get(entry.getKey()))); } for (Map.Entry entry : tdigests.entrySet()) { // Emit all percentiles for (int i = 0; i < percentiles.length; i++) { - hostDatum.getMetaData().put(new Text("pct" + Integer.toString(percentiles[i]) + "." + entry.getKey()), new FloatWritable((float)entry.getValue().quantile(0.5))); + hostDatum.getMetaData().put( + new Text("pct" + Integer.toString(percentiles[i]) + "." + + entry.getKey()), + new FloatWritable((float) entry.getValue().quantile(0.5))); } - } + } for (Map.Entry entry : minimums.entrySet()) { - hostDatum.getMetaData().put(new Text("min." + entry.getKey()), new FloatWritable(entry.getValue())); + hostDatum.getMetaData().put(new Text("min." + entry.getKey()), + new FloatWritable(entry.getValue())); } - + reporter.incrCounter("UpdateHostDb", "total_hosts", 1); // See if this record is to be checked if (shouldCheck(hostDatum)) { // Make an entry - resolverThread = new ResolverThread(key.toString(), hostDatum, output, reporter, purgeFailedHostsThreshold); + resolverThread = new ResolverThread(key.toString(), hostDatum, output, + reporter, purgeFailedHostsThreshold); // Add the entry to the queue (blocking) try { @@ -357,11 +446,11 @@ public void reduce(Text key, Iterator values, } /** - * Determines whether a record should be checked. - * - * @param datum - * @return boolean - */ + * Determines whether a record should be checked. + * + * @param datum + * @return boolean + */ protected boolean shouldCheck(HostDatum datum) { // Whether a new record is to be checked if (checkNew && datum.isEmpty()) { @@ -383,15 +472,16 @@ protected boolean shouldCheck(HostDatum datum) { } /** - * Determines whether a record is eligible for recheck. - * - * @param datum - * @return boolean - */ + * Determines whether a record is eligible for recheck. + * + * @param datum + * @return boolean + */ protected boolean isEligibleForCheck(HostDatum datum) { // Whether an existing host, known or unknown, if forced to be rechecked - if (force || datum.getLastCheck().getTime() + - (recheckInterval * datum.getDnsFailures() + 1) > now) { + if (force + || datum.getLastCheck().getTime() + + (recheckInterval * datum.getDnsFailures() + 1) > now) { return true; } @@ -399,8 +489,8 @@ protected boolean isEligibleForCheck(HostDatum datum) { } /** - * Shut down all running threads and wait for completion. - */ + * Shut down all running threads and wait for completion. + */ public void close() { LOG.info("UpdateHostDb: feeder finished, waiting for shutdown"); @@ -414,7 +504,8 @@ public void close() { try { // Wait for the executor to shut down completely if (!executor.isTerminated()) { - LOG.info("UpdateHostDb: resolver threads waiting: " + Integer.toString(executor.getPoolSize())); + LOG.info("UpdateHostDb: resolver threads waiting: " + + Integer.toString(executor.getPoolSize())); Thread.sleep(1000); } else { // All is well, get out