From cc7b93addcda9542dd1cd1b11291d3c017389ad1 Mon Sep 17 00:00:00 2001 From: lewismc Date: Fri, 6 Mar 2026 16:00:16 -0800 Subject: [PATCH 1/6] NUTCH-3162 Latency metrics to properly merge data from all threads and tasks --- .../org/apache/nutch/fetcher/Fetcher.java | 68 +++++++- .../apache/nutch/fetcher/FetcherThread.java | 11 +- .../nutch/indexer/IndexerMapReduce.java | 91 ++++++++++- .../org/apache/nutch/indexer/IndexingJob.java | 20 +++ .../apache/nutch/metrics/ErrorTracker.java | 29 +++- .../apache/nutch/metrics/LatencyTracker.java | 145 ++++++++++++++---- .../apache/nutch/metrics/NutchMetrics.java | 7 + .../org/apache/nutch/parse/ParseSegment.java | 70 +++++++-- .../nutch/metrics/TestLatencyTracker.java | 87 +++++++++++ 9 files changed, 480 insertions(+), 48 deletions(-) create mode 100644 src/test/org/apache/nutch/metrics/TestLatencyTracker.java diff --git a/src/java/org/apache/nutch/fetcher/Fetcher.java b/src/java/org/apache/nutch/fetcher/Fetcher.java index 029d95ff7c..0b9b28ac47 100644 --- a/src/java/org/apache/nutch/fetcher/Fetcher.java +++ b/src/java/org/apache/nutch/fetcher/Fetcher.java @@ -33,12 +33,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; @@ -46,9 +48,12 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; + +import com.tdunning.math.stats.MergingDigest; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.NutchWritable; import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.metrics.LatencyTracker; import org.apache.nutch.metrics.NutchMetrics; import org.apache.nutch.util.MimeUtil; import org.apache.nutch.util.NutchConfiguration; @@ -223,9 +228,9 @@ public void run(Context innerContext) setup(innerContext); initCounters(innerContext); + LinkedList fetcherThreads = new LinkedList<>(); try { Configuration conf = innerContext.getConfiguration(); - LinkedList fetcherThreads = new LinkedList<>(); FetchItemQueues fetchQueues = new FetchItemQueues(conf); QueueFeeder feeder; @@ -494,11 +499,70 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) { } while (activeThreads.get() > 0); LOG.info("-activeThreads={}", activeThreads); } finally { + // Merge all thread latency trackers and emit once; emit TDigest for reducer + LatencyTracker mergedLatencyTracker = new LatencyTracker( + NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_LATENCY); + for (FetcherThread fetcherThread : fetcherThreads) { + mergedLatencyTracker.merge(fetcherThread.getFetchLatencyTracker()); + } + mergedLatencyTracker.emitCountAndSumOnly(innerContext); + byte[] digestBytes = mergedLatencyTracker.toBytes(); + if (digestBytes.length > 0) { + innerContext.write(new Text(NutchMetrics.LATENCY_KEY), + new NutchWritable(new BytesWritable(digestBytes))); + } cleanup(innerContext); } } } + /** + * Reducer that passes through (url, datum) records and merges TDigests from + * map tasks to set job-level latency percentile counters. + */ + public static class FetcherReducer extends + Reducer { + + private static final Text LATENCY_KEY = new Text(NutchMetrics.LATENCY_KEY); + + @Override + public void reduce(Text key, Iterable values, + Context context) throws IOException, InterruptedException { + if (key.equals(LATENCY_KEY)) { + MergingDigest mergedDigest = null; + for (NutchWritable nutchWritable : values) { + if (nutchWritable.get() instanceof BytesWritable) { + BytesWritable digestBytesWritable = (BytesWritable) nutchWritable.get(); + byte[] digestBytes = digestBytesWritable.copyBytes(); + if (digestBytes != null && digestBytes.length > 0) { + MergingDigest digest = LatencyTracker.fromBytes(digestBytes); + if (digest != null) { + if (mergedDigest == null) { + mergedDigest = digest; + } else { + mergedDigest.add(digest); + } + } + } + } + } + // Set only percentile counters; count_total and sum_ms are already correct from task aggregation + if (mergedDigest != null) { + context.getCounter(NutchMetrics.GROUP_FETCHER, + NutchMetrics.FETCHER_LATENCY + LatencyTracker.SUFFIX_P50_MS).setValue((long) mergedDigest.quantile(0.50)); + context.getCounter(NutchMetrics.GROUP_FETCHER, + NutchMetrics.FETCHER_LATENCY + LatencyTracker.SUFFIX_P95_MS).setValue((long) mergedDigest.quantile(0.95)); + context.getCounter(NutchMetrics.GROUP_FETCHER, + NutchMetrics.FETCHER_LATENCY + LatencyTracker.SUFFIX_P99_MS).setValue((long) mergedDigest.quantile(0.99)); + } + return; + } + for (NutchWritable value : values) { + context.write(key, value); + } + } + } + public void fetch(Path segment, int threads) throws IOException, InterruptedException, ClassNotFoundException { @@ -561,6 +625,8 @@ public void fetch(Path segment, int threads) throws IOException, job.setInputFormatClass(InputFormat.class); job.setJarByClass(Fetcher.class); job.setMapperClass(Fetcher.FetcherRun.class); + job.setReducerClass(Fetcher.FetcherReducer.class); + job.setNumReduceTasks(1); FileOutputFormat.setOutputPath(job, segment); job.setOutputFormatClass(FetcherOutputFormat.class); diff --git a/src/java/org/apache/nutch/fetcher/FetcherThread.java b/src/java/org/apache/nutch/fetcher/FetcherThread.java index d367a4f298..fe2b95c3bf 100644 --- a/src/java/org/apache/nutch/fetcher/FetcherThread.java +++ b/src/java/org/apache/nutch/fetcher/FetcherThread.java @@ -567,8 +567,7 @@ public void run() { if (fit != null) { fetchQueues.finishFetchItem(fit); } - // Emit fetch latency metrics - fetchLatencyTracker.emitCounters(context); + // Latency metrics are merged and emitted once by FetcherRun in the mapper // Emit error metrics errorTracker.emitCounters(context); activeThreads.decrementAndGet(); // count threads @@ -577,6 +576,14 @@ public void run() { } } + /** + * Returns the fetch latency tracker for this thread so the mapper can merge + * all thread trackers and emit job-level percentiles. + */ + public LatencyTracker getFetchLatencyTracker() { + return fetchLatencyTracker; + } + private Text handleRedirect(FetchItem fit, String newUrl, boolean temp, String redirType) throws MalformedURLException, URLFilterException, InterruptedException { diff --git a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java index 50da12b8a2..5ffde8aa2b 100644 --- a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java +++ b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java @@ -21,21 +21,28 @@ import java.util.Collection; import java.util.Locale; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.StringUtils; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.CrawlDb; import org.apache.nutch.crawl.Inlinks; @@ -56,6 +63,8 @@ import org.apache.nutch.scoring.ScoringFilterException; import org.apache.nutch.scoring.ScoringFilters; +import com.tdunning.math.stats.MergingDigest; + /** *

* This class is typically invoked from within @@ -294,8 +303,21 @@ private void initCounters(Reducer.C @Override public void cleanup(Reducer.Context context) throws IOException, InterruptedException { - // Emit indexing latency metrics - indexLatencyTracker.emitCounters(context); + indexLatencyTracker.emitCountAndSumOnly(context); + byte[] digestBytes = indexLatencyTracker.toBytes(); + if (digestBytes.length > 0) { + Path outPath = FileOutputFormat.getOutputPath(context); + Path latencyDir = new Path(outPath, "_latency"); + Path latencyFile = new Path(latencyDir, context.getTaskAttemptID().toString() + ".seq"); + FileSystem fs = latencyFile.getFileSystem(context.getConfiguration()); + fs.mkdirs(latencyDir); + try (SequenceFile.Writer writer = SequenceFile.createWriter(context.getConfiguration(), + SequenceFile.Writer.file(latencyFile), + SequenceFile.Writer.keyClass(NullWritable.class), + SequenceFile.Writer.valueClass(BytesWritable.class))) { + writer.append(NullWritable.get(), new BytesWritable(digestBytes)); + } + } } @Override @@ -557,4 +579,67 @@ public static void initMRJob(Path crawlDb, Path linkDb, job.setMapOutputValueClass(NutchWritable.class); job.setOutputValueClass(NutchWritable.class); } + + /** Mapper for Indexer Latency Merge job: passes through (1, bytes). */ + public static class IndexerLatencyMergeMapper + extends Mapper { + private static final IntWritable ONE = new IntWritable(1); + + @Override + public void map(NullWritable key, BytesWritable value, Context context) + throws IOException, InterruptedException { + context.write(ONE, value); + } + } + + /** Reducer for Indexer Latency Merge job: merges TDigests and sets job counters. */ + public static class IndexerLatencyMergeReducer + extends Reducer { + + @Override + public void reduce(IntWritable key, Iterable values, Context context) + throws IOException, InterruptedException { + MergingDigest merged = null; + for (BytesWritable bw : values) { + byte[] bytes = bw.copyBytes(); + if (bytes != null && bytes.length > 0) { + MergingDigest d = LatencyTracker.fromBytes(bytes); + if (d != null) { + if (merged == null) { + merged = d; + } else { + merged.add(d); + } + } + } + } + if (merged != null) { + context.getCounter(NutchMetrics.GROUP_INDEXER, + NutchMetrics.INDEXER_LATENCY + LatencyTracker.SUFFIX_P50_MS).setValue((long) merged.quantile(0.50)); + context.getCounter(NutchMetrics.GROUP_INDEXER, + NutchMetrics.INDEXER_LATENCY + LatencyTracker.SUFFIX_P95_MS).setValue((long) merged.quantile(0.95)); + context.getCounter(NutchMetrics.GROUP_INDEXER, + NutchMetrics.INDEXER_LATENCY + LatencyTracker.SUFFIX_P99_MS).setValue((long) merged.quantile(0.99)); + } + } + } + + /** + * Runs a small job that merges TDigest side files from the indexer and sets + * job-level percentile counters. Call after the main index job succeeds. + */ + public static Job createLatencyMergeJob(Configuration conf, Path latencyDir) + throws IOException { + Job job = Job.getInstance(conf, "Nutch Indexer Latency Merge"); + job.setJarByClass(IndexerMapReduce.class); + FileInputFormat.addInputPath(job, latencyDir); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setMapperClass(IndexerLatencyMergeMapper.class); + job.setReducerClass(IndexerLatencyMergeReducer.class); + job.setNumReduceTasks(1); + job.setOutputFormatClass(NullOutputFormat.class); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(BytesWritable.class); + return job; + } } diff --git a/src/java/org/apache/nutch/indexer/IndexingJob.java b/src/java/org/apache/nutch/indexer/IndexingJob.java index 224b4118e6..cf29c091c0 100644 --- a/src/java/org/apache/nutch/indexer/IndexingJob.java +++ b/src/java/org/apache/nutch/indexer/IndexingJob.java @@ -30,6 +30,7 @@ import org.apache.commons.lang3.time.StopWatch; import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.metrics.ErrorTracker; import org.apache.nutch.metrics.NutchMetrics; import org.apache.nutch.segment.SegmentChecker; import org.apache.hadoop.conf.Configuration; @@ -37,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.util.StringUtils; @@ -143,6 +145,8 @@ public void index(Path crawlDb, Path linkDb, List segments, + RANDOM.nextInt()); FileOutputFormat.setOutputPath(job, tmp); + // Driver-level error tracking: categorization + LOG.error only (no job counters; see ErrorTracker Javadoc). + ErrorTracker errorTracker = new ErrorTracker(NutchMetrics.GROUP_INDEXER); try { try{ boolean success = job.waitForCompletion(true); @@ -155,6 +159,22 @@ public void index(Path crawlDb, Path linkDb, List segments, LOG.error(StringUtils.stringifyException(e)); throw e; } + Path latencyDir = new Path(tmp, "_latency"); + FileSystem fs = tmp.getFileSystem(conf); + if (fs.exists(latencyDir)) { + try { + Job mergeJob = IndexerMapReduce.createLatencyMergeJob(conf, latencyDir); + FileOutputFormat.setOutputPath(mergeJob, new Path(tmp, "_latency_merge_out")); + boolean mergeSuccess = mergeJob.waitForCompletion(true); + if (!mergeSuccess) { + LOG.error("Indexer Latency Merge job failed"); + errorTracker.recordError(ErrorTracker.ErrorType.OTHER); + } + } catch (IOException | InterruptedException | ClassNotFoundException e) { + LOG.error("Indexer Latency Merge job failed: {}", e.getMessage()); + errorTracker.recordError(e); + } + } LOG.info("Indexer: number of documents indexed, deleted, or skipped:"); for (Counter counter : job.getCounters() .getGroup(NutchMetrics.GROUP_INDEXER)) { diff --git a/src/java/org/apache/nutch/metrics/ErrorTracker.java b/src/java/org/apache/nutch/metrics/ErrorTracker.java index 1921071605..c4d84af0fa 100644 --- a/src/java/org/apache/nutch/metrics/ErrorTracker.java +++ b/src/java/org/apache/nutch/metrics/ErrorTracker.java @@ -34,10 +34,12 @@ * based on exception type. It uses a bounded set of error categories to stay within * Hadoop's counter limits (~120 counters). * - *

Usage: + *

Usage in mapper/reducer or task threads: *

  * // In mapper/reducer setup or thread initialization
  * errorTracker = new ErrorTracker(NutchMetrics.GROUP_FETCHER);
+ * // or with context for cached counters:
+ * errorTracker = new ErrorTracker(NutchMetrics.GROUP_FETCHER, context);
  * 
  * // When catching exceptions
  * try {
@@ -49,11 +51,20 @@
  * // Or with manual categorization
  * errorTracker.recordError(ErrorTracker.ErrorType.NETWORK);
  * 
- * // In cleanup - emit all error counters
+ * // In cleanup - emit all error counters to the job
  * errorTracker.emitCounters(context);
  * 
* - *

Emits the following counters: + *

Usage in driver/client code (no task context): + * When used in a job driver or other code that does not run inside a mapper/reducer, + * create an ErrorTracker with the single-argument constructor (counter group only). + * Call {@link #recordError(Throwable)} or {@link #recordError(ErrorTracker.ErrorType)} + * for consistent error categorization. Do not call {@link #emitCounters(TaskInputOutputContext)}; + * Hadoop counters can only be written from within a task, so counts remain in-memory only. + * This allows the same categorization and logging pattern (e.g. with LOG.error) as in + * tasks, without emitting to job counters. + * + *

Emits the following counters (when used inside a task and emitCounters is called): *

    *
  • errors_total - total number of errors across all categories
  • *
  • errors_network_total - network-related errors
  • @@ -104,9 +115,15 @@ public enum ErrorType { /** * Creates a new ErrorTracker for the specified counter group. * - *

    This constructor creates an ErrorTracker without cached counters. - * Call {@link #initCounters(TaskInputOutputContext)} in setup() to cache - * counter references for better performance. + *

    Use in mapper/reducer setup or thread initialization: call + * {@link #initCounters(TaskInputOutputContext)} in setup() to cache counter + * references, then {@link #emitCounters(TaskInputOutputContext)} in cleanup to + * emit counts to the job. + * + *

    Use in driver/client code (no task context): do not call initCounters or + * emitCounters. Only {@link #recordError(Throwable)} and + * {@link #recordError(ErrorTracker.ErrorType)} are used; counts stay in-memory + * for consistent categorization and logging (e.g. with LOG.error). * * @param group the Hadoop counter group name (e.g., NutchMetrics.GROUP_FETCHER) */ diff --git a/src/java/org/apache/nutch/metrics/LatencyTracker.java b/src/java/org/apache/nutch/metrics/LatencyTracker.java index 3777bb29e3..5725b1ddb6 100644 --- a/src/java/org/apache/nutch/metrics/LatencyTracker.java +++ b/src/java/org/apache/nutch/metrics/LatencyTracker.java @@ -16,31 +16,36 @@ */ package org.apache.nutch.metrics; +import java.nio.ByteBuffer; + import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import com.tdunning.math.stats.MergingDigest; import com.tdunning.math.stats.TDigest; /** * A utility class for tracking latency metrics using TDigest for percentile * calculation. - * - *

    This class wraps a TDigest data structure to collect latency samples and + * + *

    This class wraps a MergingDigest data structure to collect latency samples and * emit Hadoop counters with count, sum, and percentile values (p50, p95, p99). - * + * MergingDigest supports merging digests from multiple tasks for job-level percentile + * computation. + * *

    Usage: *

      * // In mapper/reducer setup
      * latencyTracker = new LatencyTracker(NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_LATENCY);
    - * 
    + *
      * // During processing
      * long start = System.currentTimeMillis();
      * // ... operation ...
      * latencyTracker.record(System.currentTimeMillis() - start);
    - * 
    + *
      * // In cleanup
      * latencyTracker.emitCounters(context);
      * 
    - * + * *

    Emits the following counters: *

      *
    • {prefix}_count_total - total number of samples
    • @@ -49,7 +54,7 @@ *
    • {prefix}_p95_ms - 95th percentile latency
    • *
    • {prefix}_p99_ms - 99th percentile latency
    • *
    - * + * * @since 1.22 */ public class LatencyTracker { @@ -57,7 +62,18 @@ public class LatencyTracker { /** Default compression factor for TDigest (controls accuracy vs memory). */ private static final double DEFAULT_COMPRESSION = 100.0; - private final TDigest digest; + /** Counter name suffix for total sample count. */ + public static final String SUFFIX_COUNT_TOTAL = "_count_total"; + /** Counter name suffix for sum of latencies in milliseconds. */ + public static final String SUFFIX_SUM_MS = "_sum_ms"; + /** Counter name suffix for 50th percentile latency in milliseconds. */ + public static final String SUFFIX_P50_MS = "_p50_ms"; + /** Counter name suffix for 95th percentile latency in milliseconds. */ + public static final String SUFFIX_P95_MS = "_p95_ms"; + /** Counter name suffix for 99th percentile latency in milliseconds. */ + public static final String SUFFIX_P99_MS = "_p99_ms"; + + private final MergingDigest digest; private final String group; private final String prefix; private long count = 0; @@ -65,19 +81,19 @@ public class LatencyTracker { /** * Creates a new LatencyTracker. - * + * * @param group the Hadoop counter group name * @param prefix the prefix for counter names (e.g., "fetch_latency") */ public LatencyTracker(String group, String prefix) { - this.digest = TDigest.createDigest(DEFAULT_COMPRESSION); + this.digest = (MergingDigest) TDigest.createMergingDigest(DEFAULT_COMPRESSION); this.group = group; this.prefix = prefix; } /** * Records a latency sample. - * + * * @param latencyMs the latency in milliseconds */ public void record(long latencyMs) { @@ -86,9 +102,24 @@ public void record(long latencyMs) { sum += latencyMs; } + /** + * Merges another LatencyTracker's digest and aggregates count/sum into this one. + * Used to combine per-thread or per-task metrics before emitting or serializing. + * + * @param other the other tracker to merge in (not modified) + */ + public void merge(LatencyTracker other) { + if (other == null || other.count == 0) { + return; + } + digest.add(other.digest); + count += other.count; + sum += other.sum; + } + /** * Returns the number of recorded samples. - * + * * @return the count of recorded latency samples */ public long getCount() { @@ -97,7 +128,7 @@ public long getCount() { /** * Returns the sum of all recorded latencies. - * + * * @return the sum of latencies in milliseconds */ public long getSum() { @@ -106,7 +137,7 @@ public long getSum() { /** * Returns the percentile value for the given quantile. - * + * * @param quantile the quantile (0.0 to 1.0) * @return the percentile value in milliseconds */ @@ -117,28 +148,88 @@ public long getPercentile(double quantile) { return (long) digest.quantile(quantile); } + /** + * Serializes the digest to bytes for transmission to a reducer or side file. + * Returns an empty array if no samples have been recorded. + * + * @return serialized digest bytes, or empty array if count is 0 + */ + public byte[] toBytes() { + if (count == 0) { + return new byte[0]; + } + ByteBuffer buf = ByteBuffer.allocate(digest.smallByteSize()); + digest.asSmallBytes(buf); + return buf.array(); + } + + /** + * Deserializes a MergingDigest from bytes (as produced by {@link #toBytes()}). + * + * @param bytes serialized digest bytes + * @return MergingDigest instance, or null if bytes is null or empty + */ + public static MergingDigest fromBytes(byte[] bytes) { + if (bytes == null || bytes.length == 0) { + return null; + } + return MergingDigest.fromBytes(ByteBuffer.wrap(bytes)); + } + + /** + * Emits only count and sum counters (not percentiles). Use in mappers when + * a reducer will merge TDigests and set job-level percentile counters. + */ + public void emitCountAndSumOnly(TaskInputOutputContext context) { + context.getCounter(group, prefix + SUFFIX_COUNT_TOTAL).setValue(count); + context.getCounter(group, prefix + SUFFIX_SUM_MS).setValue(sum); + } + /** * Emits all latency counters to the Hadoop context. - * + * *

    Should be called once during cleanup to emit aggregated metrics. - * + * * @param context the Hadoop task context */ public void emitCounters(TaskInputOutputContext context) { - context.getCounter(group, prefix + "_count_total").setValue(count); - context.getCounter(group, prefix + "_sum_ms").setValue(sum); - + context.getCounter(group, prefix + SUFFIX_COUNT_TOTAL).setValue(count); + context.getCounter(group, prefix + SUFFIX_SUM_MS).setValue(sum); + if (count > 0) { - context.getCounter(group, prefix + "_p50_ms").setValue((long) digest.quantile(0.50)); - context.getCounter(group, prefix + "_p95_ms").setValue((long) digest.quantile(0.95)); - context.getCounter(group, prefix + "_p99_ms").setValue((long) digest.quantile(0.99)); + context.getCounter(group, prefix + SUFFIX_P50_MS).setValue((long) digest.quantile(0.50)); + context.getCounter(group, prefix + SUFFIX_P95_MS).setValue((long) digest.quantile(0.95)); + context.getCounter(group, prefix + SUFFIX_P99_MS).setValue((long) digest.quantile(0.99)); } else { // Set to 0 if no samples recorded - context.getCounter(group, prefix + "_p50_ms").setValue(0); - context.getCounter(group, prefix + "_p95_ms").setValue(0); - context.getCounter(group, prefix + "_p99_ms").setValue(0); + context.getCounter(group, prefix + SUFFIX_P50_MS).setValue(0); + context.getCounter(group, prefix + SUFFIX_P95_MS).setValue(0); + context.getCounter(group, prefix + SUFFIX_P99_MS).setValue(0); } } -} - + /** + * Sets job-level percentile counters from a merged digest (e.g. in a reducer + * that merged TDigests from all tasks). Uses the same counter names as + * {@link #emitCounters(TaskInputOutputContext)}. + * + * @param context the Hadoop task context + * @param mergedCount total count from merged digest + * @param mergedSum total sum from merged digest + * @param mergedDigest the merged MergingDigest (may be null if mergedCount is 0) + */ + public static void setJobLevelCounters(TaskInputOutputContext context, + String group, String prefix, long mergedCount, long mergedSum, MergingDigest mergedDigest) { + context.getCounter(group, prefix + SUFFIX_COUNT_TOTAL).setValue(mergedCount); + context.getCounter(group, prefix + SUFFIX_SUM_MS).setValue(mergedSum); + if (mergedCount > 0 && mergedDigest != null) { + context.getCounter(group, prefix + SUFFIX_P50_MS).setValue((long) mergedDigest.quantile(0.50)); + context.getCounter(group, prefix + SUFFIX_P95_MS).setValue((long) mergedDigest.quantile(0.95)); + context.getCounter(group, prefix + SUFFIX_P99_MS).setValue((long) mergedDigest.quantile(0.99)); + } else { + context.getCounter(group, prefix + SUFFIX_P50_MS).setValue(0); + context.getCounter(group, prefix + SUFFIX_P95_MS).setValue(0); + context.getCounter(group, prefix + SUFFIX_P99_MS).setValue(0); + } + } +} diff --git a/src/java/org/apache/nutch/metrics/NutchMetrics.java b/src/java/org/apache/nutch/metrics/NutchMetrics.java index ef4fe79e57..a761a9f0d8 100644 --- a/src/java/org/apache/nutch/metrics/NutchMetrics.java +++ b/src/java/org/apache/nutch/metrics/NutchMetrics.java @@ -372,6 +372,13 @@ private NutchMetrics() { */ public static final String INDEXER_LATENCY = "index_latency"; + /** + * Special key used in map output to send serialized TDigest bytes to the + * reducer for job-level percentile merge. Reducers detect this key and merge + * digests instead of writing to output. + */ + public static final String LATENCY_KEY = "__LATENCY__"; + // ========================================================================= // Common Error Counter Names (used with component-specific groups) // These constants are shared across all components for consistent error diff --git a/src/java/org/apache/nutch/parse/ParseSegment.java b/src/java/org/apache/nutch/parse/ParseSegment.java index 0b2a6f2290..bf5c00afda 100644 --- a/src/java/org/apache/nutch/parse/ParseSegment.java +++ b/src/java/org/apache/nutch/parse/ParseSegment.java @@ -46,9 +46,11 @@ import org.apache.nutch.scoring.ScoringFilters; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Partitioner; import java.io.File; import java.io.IOException; @@ -77,7 +79,7 @@ public ParseSegment(Configuration conf) { } public static class ParseSegmentMapper extends - Mapper, Content, Text, ParseImpl> { + Mapper, Content, Text, Writable> { private ParseUtil parseUtil; private Text newKey = new Text(); @@ -87,7 +89,7 @@ public static class ParseSegmentMapper extends private ErrorTracker errorTracker; @Override - public void setup(Mapper, Content, Text, ParseImpl>.Context context) { + public void setup(Mapper, Content, Text, Writable>.Context context) { Configuration conf = context.getConfiguration(); scfilters = new ScoringFilters(conf); skipTruncated = conf.getBoolean(SKIP_TRUNCATED, true); @@ -98,15 +100,18 @@ public void setup(Mapper, Content, Text, ParseImpl>.Contex } @Override - public void cleanup(Mapper, Content, Text, ParseImpl>.Context context) + public void cleanup(Mapper, Content, Text, Writable>.Context context) throws IOException, InterruptedException { - // Emit parse latency metrics - parseLatencyTracker.emitCounters(context); + parseLatencyTracker.emitCountAndSumOnly(context); + byte[] digestBytes = parseLatencyTracker.toBytes(); + if (digestBytes.length > 0) { + context.write(new Text(NutchMetrics.LATENCY_KEY), new BytesWritable(digestBytes)); + } } @Override public void map(WritableComparable key, Content content, - Context context) + Mapper, Content, Text, Writable>.Context context) throws IOException, InterruptedException { // convert on the fly from old UTF8 keys if (key instanceof Text) { @@ -235,15 +240,60 @@ public static boolean isTruncated(Content content) { return false; } + /** Sends LATENCY_KEY to partition 0 so one reducer merges all TDigests. */ + public static class ParseSegmentPartitioner extends Partitioner { + @Override + public int getPartition(Text key, Writable value, int numPartitions) { + if (numPartitions <= 1) { + return 0; + } + if (key.toString().equals(NutchMetrics.LATENCY_KEY)) { + return 0; + } + return (key.hashCode() & Integer.MAX_VALUE) % numPartitions; + } + } + public static class ParseSegmentReducer extends - Reducer { + Reducer { + + private static final Text LATENCY_KEY = new Text(NutchMetrics.LATENCY_KEY); @Override public void reduce(Text key, Iterable values, - Context context) + Reducer.Context context) throws IOException, InterruptedException { + if (key.equals(LATENCY_KEY)) { + com.tdunning.math.stats.MergingDigest merged = null; + for (Writable w : values) { + if (w instanceof BytesWritable) { + byte[] bytes = ((BytesWritable) w).copyBytes(); + if (bytes != null && bytes.length > 0) { + com.tdunning.math.stats.MergingDigest d = LatencyTracker.fromBytes(bytes); + if (d != null) { + if (merged == null) { + merged = d; + } else { + merged.add(d); + } + } + } + } + } + if (merged != null) { + context.getCounter(NutchMetrics.GROUP_PARSER, + NutchMetrics.PARSER_LATENCY + LatencyTracker.SUFFIX_P50_MS).setValue((long) merged.quantile(0.50)); + context.getCounter(NutchMetrics.GROUP_PARSER, + NutchMetrics.PARSER_LATENCY + LatencyTracker.SUFFIX_P95_MS).setValue((long) merged.quantile(0.95)); + context.getCounter(NutchMetrics.GROUP_PARSER, + NutchMetrics.PARSER_LATENCY + LatencyTracker.SUFFIX_P99_MS).setValue((long) merged.quantile(0.99)); + } + return; + } Iterator valuesIter = values.iterator(); - context.write(key, valuesIter.next()); // collect first value + if (valuesIter.hasNext()) { + context.write(key, (ParseImpl) valuesIter.next()); + } } } @@ -268,6 +318,8 @@ public void parse(Path segment) throws IOException, job.setJarByClass(ParseSegment.class); job.setMapperClass(ParseSegment.ParseSegmentMapper.class); job.setReducerClass(ParseSegment.ParseSegmentReducer.class); + job.setPartitionerClass(ParseSegment.ParseSegmentPartitioner.class); + job.setMapOutputValueClass(Writable.class); FileOutputFormat.setOutputPath(job, segment); job.setOutputFormatClass(ParseOutputFormat.class); diff --git a/src/test/org/apache/nutch/metrics/TestLatencyTracker.java b/src/test/org/apache/nutch/metrics/TestLatencyTracker.java new file mode 100644 index 0000000000..952b440da0 --- /dev/null +++ b/src/test/org/apache/nutch/metrics/TestLatencyTracker.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.metrics; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +/** + * Unit tests for {@link LatencyTracker} merge, serialization (toBytes/fromBytes), + * and percentile behavior. + */ +class TestLatencyTracker { + + private static final String GROUP = "test"; + private static final String PREFIX = "test_latency"; + + @Test + void testMergeAggregatesCountAndSum() { + LatencyTracker a = new LatencyTracker(GROUP, PREFIX); + a.record(10); + a.record(20); + LatencyTracker b = new LatencyTracker(GROUP, PREFIX); + b.record(30); + a.merge(b); + assertEquals(3, a.getCount()); + assertEquals(60, a.getSum()); + } + + @Test + void testMergeNullOrEmptyIsNoOp() { + LatencyTracker a = new LatencyTracker(GROUP, PREFIX); + a.record(5); + long countBefore = a.getCount(); + a.merge(null); + assertEquals(countBefore, a.getCount()); + LatencyTracker empty = new LatencyTracker(GROUP, PREFIX); + a.merge(empty); + assertEquals(countBefore, a.getCount()); + } + + @Test + void testToBytesFromBytesRoundTrip() { + LatencyTracker tracker = new LatencyTracker(GROUP, PREFIX); + tracker.record(100); + tracker.record(200); + tracker.record(300); + byte[] bytes = tracker.toBytes(); + assertNotNull(bytes); + assertEquals(3, tracker.getCount()); + com.tdunning.math.stats.MergingDigest restored = LatencyTracker.fromBytes(bytes); + assertNotNull(restored); + double q50 = restored.quantile(0.50); + assertTrue(q50 >= 100 && q50 <= 300); + } + + @Test + void testToBytesEmptyReturnsEmptyArray() { + LatencyTracker tracker = new LatencyTracker(GROUP, PREFIX); + byte[] bytes = tracker.toBytes(); + assertNotNull(bytes); + assertEquals(0, bytes.length); + } + + @Test + void testFromBytesNullOrEmptyReturnsNull() { + assertNull(LatencyTracker.fromBytes(null)); + assertNull(LatencyTracker.fromBytes(new byte[0])); + } +} From 117ed9a3434801a141454d58d33a55941e1015c7 Mon Sep 17 00:00:00 2001 From: lewismc Date: Wed, 11 Mar 2026 06:26:41 -0700 Subject: [PATCH 2/6] NUTCH-3162 Latency metrics to properly merge data from all threads and tasks --- .../nutch/metrics/TestLatencyTracker.java | 199 +++++++++++++++++- .../nutch/util/ReducerContextWrapper.java | 12 +- 2 files changed, 209 insertions(+), 2 deletions(-) diff --git a/src/test/org/apache/nutch/metrics/TestLatencyTracker.java b/src/test/org/apache/nutch/metrics/TestLatencyTracker.java index 952b440da0..d404f283c9 100644 --- a/src/test/org/apache/nutch/metrics/TestLatencyTracker.java +++ b/src/test/org/apache/nutch/metrics/TestLatencyTracker.java @@ -21,11 +21,25 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.ReducerContextWrapper; import org.junit.jupiter.api.Test; +import com.tdunning.math.stats.MergingDigest; +import com.tdunning.math.stats.TDigest; + /** * Unit tests for {@link LatencyTracker} merge, serialization (toBytes/fromBytes), - * and percentile behavior. + * percentile behavior, and counter emission. Counter-emitting tests use the real + * Hadoop Context and Counters via {@link ReducerContextWrapper} (no mocks of Hadoop). */ class TestLatencyTracker { @@ -84,4 +98,187 @@ void testFromBytesNullOrEmptyReturnsNull() { assertNull(LatencyTracker.fromBytes(null)); assertNull(LatencyTracker.fromBytes(new byte[0])); } + + // Integration-style tests: real Hadoop Context and Counters (no mocks). + // Uses ReducerContextWrapper to drive a reducer that emits latency counters. + + @Test + void testEmitCountAndSumOnlyUpdatesJobCounters() throws IOException, InterruptedException { + Configuration conf = NutchConfiguration.create(); + Map out = new HashMap<>(); + EmitCountAndSumOnlyReducer reducer = new EmitCountAndSumOnlyReducer(GROUP, PREFIX); + ReducerContextWrapper wrapper = + new ReducerContextWrapper<>(reducer, conf, out); + reducer.reduce(new Text("k"), Collections.singletonList(new Text("v")), wrapper.getContext()); + + assertEquals(2, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_COUNT_TOTAL).getValue()); + assertEquals(30, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_SUM_MS).getValue()); + } + + @Test + void testEmitCountersUpdatesJobCounters() throws IOException, InterruptedException { + Configuration conf = NutchConfiguration.create(); + Map out = new HashMap<>(); + EmitCountersReducer reducer = new EmitCountersReducer(GROUP, PREFIX); + ReducerContextWrapper wrapper = + new ReducerContextWrapper<>(reducer, conf, out); + reducer.reduce(new Text("k"), Collections.singletonList(new Text("v")), wrapper.getContext()); + + assertEquals(3, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_COUNT_TOTAL).getValue()); + assertEquals(600, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_SUM_MS).getValue()); + long p50 = wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P50_MS).getValue(); + long p95 = wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P95_MS).getValue(); + long p99 = wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P99_MS).getValue(); + assertTrue(p50 >= 100 && p50 <= 300); + assertTrue(p95 >= 100 && p95 <= 300); + assertTrue(p99 >= 100 && p99 <= 300); + } + + @Test + void testEmitCountersWithZeroSamplesSetsPercentilesToZero() throws IOException, InterruptedException { + Configuration conf = NutchConfiguration.create(); + Map out = new HashMap<>(); + EmitCountersZeroReducer reducer = new EmitCountersZeroReducer(GROUP, PREFIX); + ReducerContextWrapper wrapper = + new ReducerContextWrapper<>(reducer, conf, out); + reducer.reduce(new Text("k"), Collections.emptyList(), wrapper.getContext()); + + assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_COUNT_TOTAL).getValue()); + assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_SUM_MS).getValue()); + assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P50_MS).getValue()); + assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P95_MS).getValue()); + assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P99_MS).getValue()); + } + + @Test + void testSetJobLevelCountersUpdatesJobCounters() throws IOException, InterruptedException { + Configuration conf = NutchConfiguration.create(); + Map out = new HashMap<>(); + SetJobLevelCountersReducer reducer = new SetJobLevelCountersReducer(GROUP, PREFIX); + ReducerContextWrapper wrapper = + new ReducerContextWrapper<>(reducer, conf, out); + reducer.reduce(new Text("k"), Collections.singletonList(new Text("v")), wrapper.getContext()); + + assertEquals(3, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_COUNT_TOTAL).getValue()); + assertEquals(600, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_SUM_MS).getValue()); + long p50 = wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P50_MS).getValue(); + long p95 = wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P95_MS).getValue(); + long p99 = wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P99_MS).getValue(); + assertTrue(p50 >= 100 && p50 <= 300); + assertTrue(p95 >= 100 && p95 <= 300); + assertTrue(p99 >= 100 && p99 <= 300); + } + + @Test + void testSetJobLevelCountersWithZeroCountSetsPercentilesToZero() throws IOException, InterruptedException { + Configuration conf = NutchConfiguration.create(); + Map out = new HashMap<>(); + SetJobLevelCountersZeroReducer reducer = new SetJobLevelCountersZeroReducer(GROUP, PREFIX); + ReducerContextWrapper wrapper = + new ReducerContextWrapper<>(reducer, conf, out); + reducer.reduce(new Text("k"), Collections.emptyList(), wrapper.getContext()); + + assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_COUNT_TOTAL).getValue()); + assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_SUM_MS).getValue()); + assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P50_MS).getValue()); + assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P95_MS).getValue()); + assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P99_MS).getValue()); + } + + /** Reducer that emits only count and sum via LatencyTracker (real Context, no mocks). */ + private static final class EmitCountAndSumOnlyReducer extends Reducer { + private final String group; + private final String prefix; + + EmitCountAndSumOnlyReducer(String group, String prefix) { + this.group = group; + this.prefix = prefix; + } + + @Override + protected void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + LatencyTracker tracker = new LatencyTracker(group, prefix); + tracker.record(10); + tracker.record(20); + tracker.emitCountAndSumOnly(context); + } + } + + /** Reducer that emits count, sum, and percentiles via LatencyTracker (real Context, no mocks). */ + private static final class EmitCountersReducer extends Reducer { + private final String group; + private final String prefix; + + EmitCountersReducer(String group, String prefix) { + this.group = group; + this.prefix = prefix; + } + + @Override + protected void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + LatencyTracker tracker = new LatencyTracker(group, prefix); + tracker.record(100); + tracker.record(200); + tracker.record(300); + tracker.emitCounters(context); + } + } + + /** Reducer that emits counters with zero samples (percentiles set to 0). */ + private static final class EmitCountersZeroReducer extends Reducer { + private final String group; + private final String prefix; + + EmitCountersZeroReducer(String group, String prefix) { + this.group = group; + this.prefix = prefix; + } + + @Override + protected void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + LatencyTracker tracker = new LatencyTracker(group, prefix); + tracker.emitCounters(context); + } + } + + /** Reducer that calls setJobLevelCounters with a merged digest (real Context, no mocks). */ + private static final class SetJobLevelCountersReducer extends Reducer { + private final String group; + private final String prefix; + + SetJobLevelCountersReducer(String group, String prefix) { + this.group = group; + this.prefix = prefix; + } + + @Override + protected void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + MergingDigest digest = (MergingDigest) TDigest.createMergingDigest(100.0); + digest.add(100); + digest.add(200); + digest.add(300); + LatencyTracker.setJobLevelCounters(context, group, prefix, 3, 600, digest); + } + } + + /** Reducer that calls setJobLevelCounters with zero count (percentiles set to 0). */ + private static final class SetJobLevelCountersZeroReducer extends Reducer { + private final String group; + private final String prefix; + + SetJobLevelCountersZeroReducer(String group, String prefix) { + this.group = group; + this.prefix = prefix; + } + + @Override + protected void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + LatencyTracker.setJobLevelCounters(context, group, prefix, 0, 0, null); + } + } } diff --git a/src/test/org/apache/nutch/util/ReducerContextWrapper.java b/src/test/org/apache/nutch/util/ReducerContextWrapper.java index 196116c4cb..c508531095 100644 --- a/src/test/org/apache/nutch/util/ReducerContextWrapper.java +++ b/src/test/org/apache/nutch/util/ReducerContextWrapper.java @@ -90,7 +90,17 @@ public ReducerContextWrapper(Reducer reducer, public Reducer.Context getContext() { return context; } - + + /** + * Return the underlying counters updated by the context, for assertions in tests. + * Uses the real Hadoop mapred Counters API (no mocks). + * + * @return the counters instance + */ + public Counters getCounters() { + return counters; + } + private void initContext() { // most methods are not used in Nutch unit tests. context = reducer.new Context() { From 5028b1524c9e0dbe65615339ec1e041a58a50b0e Mon Sep 17 00:00:00 2001 From: lewismc Date: Wed, 11 Mar 2026 11:12:27 -0700 Subject: [PATCH 3/6] NUTCH-3162 Latency metrics to properly merge data from all threads and tasks --- .../org/apache/nutch/indexer/IndexingJob.java | 7 +- .../nutch/fetcher/TestFetcherReducer.java | 118 ++++++++++++++++++ .../nutch/indexer/TestIndexerMapReduce.java | 62 +++++++++ .../nutch/metrics/TestLatencyTracker.java | 21 ++++ .../apache/nutch/parse/TestParseSegment.java | 71 ++++++++++- 5 files changed, 275 insertions(+), 4 deletions(-) create mode 100644 src/test/org/apache/nutch/fetcher/TestFetcherReducer.java diff --git a/src/java/org/apache/nutch/indexer/IndexingJob.java b/src/java/org/apache/nutch/indexer/IndexingJob.java index cf29c091c0..ee8696c704 100644 --- a/src/java/org/apache/nutch/indexer/IndexingJob.java +++ b/src/java/org/apache/nutch/indexer/IndexingJob.java @@ -162,8 +162,7 @@ public void index(Path crawlDb, Path linkDb, List segments, Path latencyDir = new Path(tmp, "_latency"); FileSystem fs = tmp.getFileSystem(conf); if (fs.exists(latencyDir)) { - try { - Job mergeJob = IndexerMapReduce.createLatencyMergeJob(conf, latencyDir); + try (Job mergeJob = IndexerMapReduce.createLatencyMergeJob(conf, latencyDir)) { FileOutputFormat.setOutputPath(mergeJob, new Path(tmp, "_latency_merge_out")); boolean mergeSuccess = mergeJob.waitForCompletion(true); if (!mergeSuccess) { @@ -171,8 +170,12 @@ public void index(Path crawlDb, Path linkDb, List segments, errorTracker.recordError(ErrorTracker.ErrorType.OTHER); } } catch (IOException | InterruptedException | ClassNotFoundException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } LOG.error("Indexer Latency Merge job failed: {}", e.getMessage()); errorTracker.recordError(e); + throw e; } } LOG.info("Indexer: number of documents indexed, deleted, or skipped:"); diff --git a/src/test/org/apache/nutch/fetcher/TestFetcherReducer.java b/src/test/org/apache/nutch/fetcher/TestFetcherReducer.java new file mode 100644 index 0000000000..477f797c39 --- /dev/null +++ b/src/test/org/apache/nutch/fetcher/TestFetcherReducer.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.fetcher; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.NutchWritable; +import org.apache.nutch.metrics.LatencyTracker; +import org.apache.nutch.metrics.NutchMetrics; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.ReducerContextWrapper; +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link Fetcher.FetcherReducer}: latency key branch (merge TDigests, + * set job-level counters) and pass-through branch (write url, datum). + */ +class TestFetcherReducer { + + @Test + void testReduceLatencyKeyMergesDigestsAndSetsCounters() throws IOException, InterruptedException { + Configuration conf = NutchConfiguration.create(); + Map out = new HashMap<>(); + Fetcher.FetcherReducer reducer = new Fetcher.FetcherReducer(); + ReducerContextWrapper wrapper = + new ReducerContextWrapper<>(reducer, conf, out); + + LatencyTracker tracker = new LatencyTracker(NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_LATENCY); + tracker.record(100); + tracker.record(200); + tracker.record(300); + byte[] digestBytes = tracker.toBytes(); + List values = new ArrayList<>(); + values.add(new NutchWritable(new BytesWritable(digestBytes))); + + reducer.reduce(new Text(NutchMetrics.LATENCY_KEY), values, wrapper.getContext()); + + long p50 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_FETCHER, + NutchMetrics.FETCHER_LATENCY + LatencyTracker.SUFFIX_P50_MS).getValue(); + long p95 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_FETCHER, + NutchMetrics.FETCHER_LATENCY + LatencyTracker.SUFFIX_P95_MS).getValue(); + long p99 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_FETCHER, + NutchMetrics.FETCHER_LATENCY + LatencyTracker.SUFFIX_P99_MS).getValue(); + assertTrue(p50 >= 100 && p50 <= 300); + assertTrue(p95 >= 100 && p95 <= 300); + assertTrue(p99 >= 100 && p99 <= 300); + assertEquals(0, out.size()); + } + + @Test + void testReduceLatencyKeyWithMultipleDigestsMergesAndSetsCounters() throws IOException, InterruptedException { + Configuration conf = NutchConfiguration.create(); + Map out = new HashMap<>(); + Fetcher.FetcherReducer reducer = new Fetcher.FetcherReducer(); + ReducerContextWrapper wrapper = + new ReducerContextWrapper<>(reducer, conf, out); + + LatencyTracker t1 = new LatencyTracker(NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_LATENCY); + t1.record(10); + LatencyTracker t2 = new LatencyTracker(NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_LATENCY); + t2.record(90); + List values = new ArrayList<>(); + values.add(new NutchWritable(new BytesWritable(t1.toBytes()))); + values.add(new NutchWritable(new BytesWritable(t2.toBytes()))); + + reducer.reduce(new Text(NutchMetrics.LATENCY_KEY), values, wrapper.getContext()); + + long p50 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_FETCHER, + NutchMetrics.FETCHER_LATENCY + LatencyTracker.SUFFIX_P50_MS).getValue(); + assertTrue(p50 >= 10 && p50 <= 90); + assertEquals(0, out.size()); + } + + @Test + void testReducePassThroughWritesKeyValue() throws IOException, InterruptedException { + Configuration conf = NutchConfiguration.create(); + Map out = new HashMap<>(); + Fetcher.FetcherReducer reducer = new Fetcher.FetcherReducer(); + ReducerContextWrapper wrapper = + new ReducerContextWrapper<>(reducer, conf, out); + + Text url = new Text("http://example.com/"); + CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_FETCH_SUCCESS, 0, 0.0f); + List values = Collections.singletonList(new NutchWritable(datum)); + + reducer.reduce(url, values, wrapper.getContext()); + + assertEquals(1, out.size()); + assertTrue(out.containsKey(url)); + assertEquals(datum, out.get(url).get()); + } +} diff --git a/src/test/org/apache/nutch/indexer/TestIndexerMapReduce.java b/src/test/org/apache/nutch/indexer/TestIndexerMapReduce.java index 81dd192063..19d857c4ba 100644 --- a/src/test/org/apache/nutch/indexer/TestIndexerMapReduce.java +++ b/src/test/org/apache/nutch/indexer/TestIndexerMapReduce.java @@ -18,8 +18,16 @@ import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.util.StringUtils; +import org.apache.nutch.metrics.LatencyTracker; +import org.apache.nutch.metrics.NutchMetrics; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.NutchWritable; import org.apache.nutch.metadata.Metadata; @@ -40,12 +48,14 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; /** Test {@link IndexerMapReduce} */ public class TestIndexerMapReduce { @@ -189,4 +199,56 @@ public NutchDocument runIndexer(CrawlDatum dbDatum, CrawlDatum fetchDatum, return doc; } + /** + * IndexerLatencyMergeReducer merges BytesWritable TDigests and sets job-level + * percentile counters. Uses real Context and Counters (no mocks). + */ + @Test + void testIndexerLatencyMergeReducerSetsCounters() throws IOException, InterruptedException { + Configuration conf = NutchConfiguration.create(); + Map out = new HashMap<>(); + IndexerMapReduce.IndexerLatencyMergeReducer reducer = new IndexerMapReduce.IndexerLatencyMergeReducer(); + ReducerContextWrapper wrapper = + new ReducerContextWrapper<>(reducer, conf, out); + + LatencyTracker tracker = new LatencyTracker(NutchMetrics.GROUP_INDEXER, NutchMetrics.INDEXER_LATENCY); + tracker.record(50); + tracker.record(150); + tracker.record(250); + byte[] digestBytes = tracker.toBytes(); + List values = Collections.singletonList(new BytesWritable(digestBytes)); + + reducer.reduce(new IntWritable(1), values, wrapper.getContext()); + + long p50 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_INDEXER, + NutchMetrics.INDEXER_LATENCY + LatencyTracker.SUFFIX_P50_MS).getValue(); + long p95 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_INDEXER, + NutchMetrics.INDEXER_LATENCY + LatencyTracker.SUFFIX_P95_MS).getValue(); + long p99 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_INDEXER, + NutchMetrics.INDEXER_LATENCY + LatencyTracker.SUFFIX_P99_MS).getValue(); + assertTrue(p50 >= 50 && p50 <= 250); + assertTrue(p95 >= 50 && p95 <= 250); + assertTrue(p99 >= 50 && p99 <= 250); + } + + /** + * createLatencyMergeJob returns a Job with correct input path, mapper/reducer, + * numReduceTasks(1), and output config. + */ + @Test + void testCreateLatencyMergeJobConfig() throws IOException, ClassNotFoundException { + Configuration conf = NutchConfiguration.create(); + Path latencyDir = new Path("/tmp/latency"); + Job job = IndexerMapReduce.createLatencyMergeJob(conf, latencyDir); + + assertNotNull(job.getConfiguration().get(FileInputFormat.INPUT_DIR)); + assertTrue(job.getConfiguration().get(FileInputFormat.INPUT_DIR).contains("latency")); + assertEquals(SequenceFileInputFormat.class, job.getInputFormatClass()); + assertEquals(IndexerMapReduce.IndexerLatencyMergeMapper.class, job.getMapperClass()); + assertEquals(IndexerMapReduce.IndexerLatencyMergeReducer.class, job.getReducerClass()); + assertEquals(1, job.getNumReduceTasks()); + assertEquals(IntWritable.class, job.getOutputKeyClass()); + assertEquals(BytesWritable.class, job.getOutputValueClass()); + } + } diff --git a/src/test/org/apache/nutch/metrics/TestLatencyTracker.java b/src/test/org/apache/nutch/metrics/TestLatencyTracker.java index d404f283c9..1665d71707 100644 --- a/src/test/org/apache/nutch/metrics/TestLatencyTracker.java +++ b/src/test/org/apache/nutch/metrics/TestLatencyTracker.java @@ -99,6 +99,27 @@ void testFromBytesNullOrEmptyReturnsNull() { assertNull(LatencyTracker.fromBytes(new byte[0])); } + @Test + void testGetPercentileReturnsValueInRange() { + LatencyTracker tracker = new LatencyTracker(GROUP, PREFIX); + tracker.record(100); + tracker.record(200); + tracker.record(300); + long p50 = tracker.getPercentile(0.50); + long p95 = tracker.getPercentile(0.95); + long p99 = tracker.getPercentile(0.99); + assertTrue(p50 >= 100 && p50 <= 300); + assertTrue(p95 >= 100 && p95 <= 300); + assertTrue(p99 >= 100 && p99 <= 300); + } + + @Test + void testGetPercentileWithZeroSamplesReturnsZero() { + LatencyTracker tracker = new LatencyTracker(GROUP, PREFIX); + assertEquals(0, tracker.getPercentile(0.50)); + assertEquals(0, tracker.getPercentile(0.95)); + } + // Integration-style tests: real Hadoop Context and Counters (no mocks). // Uses ReducerContextWrapper to drive a reducer that emits latency counters. diff --git a/src/test/org/apache/nutch/parse/TestParseSegment.java b/src/test/org/apache/nutch/parse/TestParseSegment.java index d989c7a0b2..9feb9fecfa 100644 --- a/src/test/org/apache/nutch/parse/TestParseSegment.java +++ b/src/test/org/apache/nutch/parse/TestParseSegment.java @@ -16,14 +16,30 @@ */ package org.apache.nutch.parse; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.metrics.LatencyTracker; +import org.apache.nutch.metrics.NutchMetrics; import org.apache.nutch.net.protocols.Response; import org.apache.nutch.protocol.Content; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.ReducerContextWrapper; import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; public class TestParseSegment { private static byte[] BYTES = "the quick brown fox".getBytes(StandardCharsets.UTF_8); @@ -77,4 +93,55 @@ public void testNoLengthField() { content.setContent(BYTES); assertFalse(ParseSegment.isTruncated(content)); } + + /** + * ParseSegmentReducer latency branch: when key is LATENCY_KEY, merges + * BytesWritable TDigests and sets job-level parser latency counters. + */ + @Test + void testParseSegmentReducerLatencyKeySetsCounters() throws IOException, InterruptedException { + Configuration conf = NutchConfiguration.create(); + Map out = new HashMap<>(); + ParseSegment.ParseSegmentReducer reducer = new ParseSegment.ParseSegmentReducer(); + ReducerContextWrapper wrapper = + new ReducerContextWrapper<>(reducer, conf, out); + + LatencyTracker tracker = new LatencyTracker(NutchMetrics.GROUP_PARSER, NutchMetrics.PARSER_LATENCY); + tracker.record(100); + tracker.record(200); + byte[] digestBytes = tracker.toBytes(); + List values = new ArrayList<>(); + values.add(new BytesWritable(digestBytes)); + + reducer.reduce(new Text(NutchMetrics.LATENCY_KEY), values, wrapper.getContext()); + + long p50 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_PARSER, + NutchMetrics.PARSER_LATENCY + LatencyTracker.SUFFIX_P50_MS).getValue(); + long p95 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_PARSER, + NutchMetrics.PARSER_LATENCY + LatencyTracker.SUFFIX_P95_MS).getValue(); + long p99 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_PARSER, + NutchMetrics.PARSER_LATENCY + LatencyTracker.SUFFIX_P99_MS).getValue(); + assertTrue(p50 >= 100 && p50 <= 200); + assertTrue(p95 >= 100 && p95 <= 200); + assertTrue(p99 >= 100 && p99 <= 200); + assertEquals(0, out.size()); + } + + /** + * ParseSegmentPartitioner sends LATENCY_KEY to partition 0 so one reducer + * merges all TDigests. + */ + @Test + void testParseSegmentPartitionerSendsLatencyKeyToPartitionZero() { + ParseSegment.ParseSegmentPartitioner partitioner = new ParseSegment.ParseSegmentPartitioner(); + int numPartitions = 4; + assertEquals(0, partitioner.getPartition(new Text(NutchMetrics.LATENCY_KEY), new BytesWritable(), numPartitions)); + } + + @Test + void testParseSegmentPartitionerWithSinglePartition() { + ParseSegment.ParseSegmentPartitioner partitioner = new ParseSegment.ParseSegmentPartitioner(); + assertEquals(0, partitioner.getPartition(new Text(NutchMetrics.LATENCY_KEY), new BytesWritable(), 1)); + assertEquals(0, partitioner.getPartition(new Text("http://example.com/"), new BytesWritable(), 1)); + } } From ba9c15a1b55f56e219706a4f5bbf5d247b04de6b Mon Sep 17 00:00:00 2001 From: lewismc Date: Wed, 11 Mar 2026 11:26:20 -0700 Subject: [PATCH 4/6] NUTCH-3162 Latency metrics to properly merge data from all threads and tasks --- .../apache/nutch/fetcher/FetcherThread.java | 1 - .../nutch/fetcher/TestFetcherReducer.java | 35 ++--- .../nutch/indexer/TestIndexerMapReduce.java | 19 +-- .../apache/nutch/metrics/LatencyTestUtil.java | 142 ++++++++++++++++++ .../nutch/metrics/TestLatencyTracker.java | 35 +---- .../apache/nutch/parse/TestParseSegment.java | 18 +-- 6 files changed, 168 insertions(+), 82 deletions(-) create mode 100644 src/test/org/apache/nutch/metrics/LatencyTestUtil.java diff --git a/src/java/org/apache/nutch/fetcher/FetcherThread.java b/src/java/org/apache/nutch/fetcher/FetcherThread.java index fe2b95c3bf..830a4734f9 100644 --- a/src/java/org/apache/nutch/fetcher/FetcherThread.java +++ b/src/java/org/apache/nutch/fetcher/FetcherThread.java @@ -567,7 +567,6 @@ public void run() { if (fit != null) { fetchQueues.finishFetchItem(fit); } - // Latency metrics are merged and emitted once by FetcherRun in the mapper // Emit error metrics errorTracker.emitCounters(context); activeThreads.decrementAndGet(); // count threads diff --git a/src/test/org/apache/nutch/fetcher/TestFetcherReducer.java b/src/test/org/apache/nutch/fetcher/TestFetcherReducer.java index 477f797c39..a1aa5f0ac4 100644 --- a/src/test/org/apache/nutch/fetcher/TestFetcherReducer.java +++ b/src/test/org/apache/nutch/fetcher/TestFetcherReducer.java @@ -31,8 +31,8 @@ import org.apache.hadoop.io.Text; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.NutchWritable; -import org.apache.nutch.metrics.LatencyTracker; import org.apache.nutch.metrics.NutchMetrics; +import org.apache.nutch.metrics.LatencyTestUtil; import org.apache.nutch.util.NutchConfiguration; import org.apache.nutch.util.ReducerContextWrapper; import org.junit.jupiter.api.Test; @@ -51,25 +51,14 @@ void testReduceLatencyKeyMergesDigestsAndSetsCounters() throws IOException, Inte ReducerContextWrapper wrapper = new ReducerContextWrapper<>(reducer, conf, out); - LatencyTracker tracker = new LatencyTracker(NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_LATENCY); - tracker.record(100); - tracker.record(200); - tracker.record(300); - byte[] digestBytes = tracker.toBytes(); + byte[] digestBytes = LatencyTestUtil.createDigestBytes(100, 200, 300); List values = new ArrayList<>(); values.add(new NutchWritable(new BytesWritable(digestBytes))); reducer.reduce(new Text(NutchMetrics.LATENCY_KEY), values, wrapper.getContext()); - long p50 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_FETCHER, - NutchMetrics.FETCHER_LATENCY + LatencyTracker.SUFFIX_P50_MS).getValue(); - long p95 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_FETCHER, - NutchMetrics.FETCHER_LATENCY + LatencyTracker.SUFFIX_P95_MS).getValue(); - long p99 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_FETCHER, - NutchMetrics.FETCHER_LATENCY + LatencyTracker.SUFFIX_P99_MS).getValue(); - assertTrue(p50 >= 100 && p50 <= 300); - assertTrue(p95 >= 100 && p95 <= 300); - assertTrue(p99 >= 100 && p99 <= 300); + LatencyTestUtil.assertPercentilesInRange(wrapper.getCounters(), + NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_LATENCY, 100, 300); assertEquals(0, out.size()); } @@ -81,19 +70,17 @@ void testReduceLatencyKeyWithMultipleDigestsMergesAndSetsCounters() throws IOExc ReducerContextWrapper wrapper = new ReducerContextWrapper<>(reducer, conf, out); - LatencyTracker t1 = new LatencyTracker(NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_LATENCY); - t1.record(10); - LatencyTracker t2 = new LatencyTracker(NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_LATENCY); - t2.record(90); + List digestWritables = LatencyTestUtil.createDigestBytesWritables( + new long[] { 10 }, new long[] { 90 }); List values = new ArrayList<>(); - values.add(new NutchWritable(new BytesWritable(t1.toBytes()))); - values.add(new NutchWritable(new BytesWritable(t2.toBytes()))); + for (BytesWritable bw : digestWritables) { + values.add(new NutchWritable(bw)); + } reducer.reduce(new Text(NutchMetrics.LATENCY_KEY), values, wrapper.getContext()); - long p50 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_FETCHER, - NutchMetrics.FETCHER_LATENCY + LatencyTracker.SUFFIX_P50_MS).getValue(); - assertTrue(p50 >= 10 && p50 <= 90); + LatencyTestUtil.assertPercentilesInRange(wrapper.getCounters(), + NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_LATENCY, 10, 90); assertEquals(0, out.size()); } diff --git a/src/test/org/apache/nutch/indexer/TestIndexerMapReduce.java b/src/test/org/apache/nutch/indexer/TestIndexerMapReduce.java index 19d857c4ba..c2671bd66a 100644 --- a/src/test/org/apache/nutch/indexer/TestIndexerMapReduce.java +++ b/src/test/org/apache/nutch/indexer/TestIndexerMapReduce.java @@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.util.StringUtils; -import org.apache.nutch.metrics.LatencyTracker; +import org.apache.nutch.metrics.LatencyTestUtil; import org.apache.nutch.metrics.NutchMetrics; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.NutchWritable; @@ -211,24 +211,13 @@ void testIndexerLatencyMergeReducerSetsCounters() throws IOException, Interrupte ReducerContextWrapper wrapper = new ReducerContextWrapper<>(reducer, conf, out); - LatencyTracker tracker = new LatencyTracker(NutchMetrics.GROUP_INDEXER, NutchMetrics.INDEXER_LATENCY); - tracker.record(50); - tracker.record(150); - tracker.record(250); - byte[] digestBytes = tracker.toBytes(); + byte[] digestBytes = LatencyTestUtil.createDigestBytes(50, 150, 250); List values = Collections.singletonList(new BytesWritable(digestBytes)); reducer.reduce(new IntWritable(1), values, wrapper.getContext()); - long p50 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_INDEXER, - NutchMetrics.INDEXER_LATENCY + LatencyTracker.SUFFIX_P50_MS).getValue(); - long p95 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_INDEXER, - NutchMetrics.INDEXER_LATENCY + LatencyTracker.SUFFIX_P95_MS).getValue(); - long p99 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_INDEXER, - NutchMetrics.INDEXER_LATENCY + LatencyTracker.SUFFIX_P99_MS).getValue(); - assertTrue(p50 >= 50 && p50 <= 250); - assertTrue(p95 >= 50 && p95 <= 250); - assertTrue(p99 >= 50 && p99 <= 250); + LatencyTestUtil.assertPercentilesInRange(wrapper.getCounters(), + NutchMetrics.GROUP_INDEXER, NutchMetrics.INDEXER_LATENCY, 50, 250); } /** diff --git a/src/test/org/apache/nutch/metrics/LatencyTestUtil.java b/src/test/org/apache/nutch/metrics/LatencyTestUtil.java new file mode 100644 index 0000000000..0b4bc4f3cd --- /dev/null +++ b/src/test/org/apache/nutch/metrics/LatencyTestUtil.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.metrics; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.Counters; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Test utility for latency-tracking tests. Reduces boilerplate when testing + * Fetcher, ParseSegment, and Indexer reducers that merge TDigests and set + * job-level percentile counters. + * + *

    Use with the real Hadoop {@link Counters} from {@link org.apache.nutch.util.ReducerContextWrapper#getCounters()} + * (no mocks). + */ +public final class LatencyTestUtil { + + private static final String DUMMY_GROUP = "test"; + private static final String DUMMY_PREFIX = "latency"; + + private LatencyTestUtil() {} + + /** + * Builds serialized TDigest bytes from the given samples. Uses a temporary + * LatencyTracker with dummy group/prefix. Callers wrap the result as needed + * (e.g. {@code new BytesWritable(bytes)} or {@code new NutchWritable(new BytesWritable(bytes))}). + * + * @param samples latency values in milliseconds to record + * @return serialized digest as from {@link LatencyTracker#toBytes()} + */ + public static byte[] createDigestBytes(long... samples) { + LatencyTracker tracker = new LatencyTracker(DUMMY_GROUP, DUMMY_PREFIX); + for (long sample : samples) { + tracker.record(sample); + } + return tracker.toBytes(); + } + + /** + * Builds one BytesWritable per array of samples (e.g. one per map task). + * Useful for reducer tests that merge multiple digests. + * + * @param sampleArrays each array is recorded into one tracker and serialized to one BytesWritable + * @return list of digest BytesWritable, in order + */ + public static List createDigestBytesWritables(long[]... sampleArrays) { + List list = new ArrayList<>(sampleArrays.length); + for (long[] samples : sampleArrays) { + list.add(new BytesWritable(createDigestBytes(samples))); + } + return list; + } + + /** + * Asserts that the job-level percentile counters (p50, p95, p99) for the + * given group and prefix are in the range [minMs, maxMs]. Uses + * {@link LatencyTracker#SUFFIX_P50_MS} etc. + * + * @param counters counters from {@link org.apache.nutch.util.ReducerContextWrapper#getCounters()} + * @param group counter group (e.g. {@link NutchMetrics#GROUP_FETCHER}) + * @param prefix counter name prefix (e.g. {@link NutchMetrics#FETCHER_LATENCY}) + * @param minMs inclusive lower bound for all percentiles (ms) + * @param maxMs inclusive upper bound for all percentiles (ms) + */ + public static void assertPercentilesInRange(Counters counters, String group, String prefix, + long minMs, long maxMs) { + long p50 = counters.findCounter(group, prefix + LatencyTracker.SUFFIX_P50_MS).getValue(); + long p95 = counters.findCounter(group, prefix + LatencyTracker.SUFFIX_P95_MS).getValue(); + long p99 = counters.findCounter(group, prefix + LatencyTracker.SUFFIX_P99_MS).getValue(); + assertTrue(p50 >= minMs && p50 <= maxMs, + "p50=" + p50 + " not in [" + minMs + "," + maxMs + "]"); + assertTrue(p95 >= minMs && p95 <= maxMs, + "p95=" + p95 + " not in [" + minMs + "," + maxMs + "]"); + assertTrue(p99 >= minMs && p99 <= maxMs, + "p99=" + p99 + " not in [" + minMs + "," + maxMs + "]"); + } + + /** + * Asserts that the count and sum counters for the given group and prefix + * match the expected values. Uses {@link LatencyTracker#SUFFIX_COUNT_TOTAL} + * and {@link LatencyTracker#SUFFIX_SUM_MS}. + * + * @param counters counters from {@link org.apache.nutch.util.ReducerContextWrapper#getCounters()} + * @param group counter group + * @param prefix counter name prefix + * @param expectedCount expected _count_total value + * @param expectedSumMs expected _sum_ms value + */ + public static void assertCountAndSum(Counters counters, String group, String prefix, + long expectedCount, long expectedSumMs) { + assertEquals(expectedCount, + counters.findCounter(group, prefix + LatencyTracker.SUFFIX_COUNT_TOTAL).getValue()); + assertEquals(expectedSumMs, + counters.findCounter(group, prefix + LatencyTracker.SUFFIX_SUM_MS).getValue()); + } + + /** + * Asserts that the percentile counters (p50, p95, p99) for the given group + * and prefix are all zero. Useful for tests that emit with zero samples. + * + * @param counters counters from {@link org.apache.nutch.util.ReducerContextWrapper#getCounters()} + * @param group counter group + * @param prefix counter name prefix + */ + public static void assertPercentilesZero(Counters counters, String group, String prefix) { + assertEquals(0, counters.findCounter(group, prefix + LatencyTracker.SUFFIX_P50_MS).getValue()); + assertEquals(0, counters.findCounter(group, prefix + LatencyTracker.SUFFIX_P95_MS).getValue()); + assertEquals(0, counters.findCounter(group, prefix + LatencyTracker.SUFFIX_P99_MS).getValue()); + } + + /** + * Asserts that count, sum, and all percentile counters are zero. + * + * @param counters counters from {@link org.apache.nutch.util.ReducerContextWrapper#getCounters()} + * @param group counter group + * @param prefix counter name prefix + */ + public static void assertCountSumAndPercentilesZero(Counters counters, String group, String prefix) { + assertCountAndSum(counters, group, prefix, 0, 0); + assertPercentilesZero(counters, group, prefix); + } +} diff --git a/src/test/org/apache/nutch/metrics/TestLatencyTracker.java b/src/test/org/apache/nutch/metrics/TestLatencyTracker.java index 1665d71707..863dc3ff39 100644 --- a/src/test/org/apache/nutch/metrics/TestLatencyTracker.java +++ b/src/test/org/apache/nutch/metrics/TestLatencyTracker.java @@ -132,8 +132,7 @@ void testEmitCountAndSumOnlyUpdatesJobCounters() throws IOException, Interrupted new ReducerContextWrapper<>(reducer, conf, out); reducer.reduce(new Text("k"), Collections.singletonList(new Text("v")), wrapper.getContext()); - assertEquals(2, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_COUNT_TOTAL).getValue()); - assertEquals(30, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_SUM_MS).getValue()); + LatencyTestUtil.assertCountAndSum(wrapper.getCounters(), GROUP, PREFIX, 2, 30); } @Test @@ -145,14 +144,8 @@ void testEmitCountersUpdatesJobCounters() throws IOException, InterruptedExcepti new ReducerContextWrapper<>(reducer, conf, out); reducer.reduce(new Text("k"), Collections.singletonList(new Text("v")), wrapper.getContext()); - assertEquals(3, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_COUNT_TOTAL).getValue()); - assertEquals(600, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_SUM_MS).getValue()); - long p50 = wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P50_MS).getValue(); - long p95 = wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P95_MS).getValue(); - long p99 = wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P99_MS).getValue(); - assertTrue(p50 >= 100 && p50 <= 300); - assertTrue(p95 >= 100 && p95 <= 300); - assertTrue(p99 >= 100 && p99 <= 300); + LatencyTestUtil.assertCountAndSum(wrapper.getCounters(), GROUP, PREFIX, 3, 600); + LatencyTestUtil.assertPercentilesInRange(wrapper.getCounters(), GROUP, PREFIX, 100, 300); } @Test @@ -164,11 +157,7 @@ void testEmitCountersWithZeroSamplesSetsPercentilesToZero() throws IOException, new ReducerContextWrapper<>(reducer, conf, out); reducer.reduce(new Text("k"), Collections.emptyList(), wrapper.getContext()); - assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_COUNT_TOTAL).getValue()); - assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_SUM_MS).getValue()); - assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P50_MS).getValue()); - assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P95_MS).getValue()); - assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P99_MS).getValue()); + LatencyTestUtil.assertCountSumAndPercentilesZero(wrapper.getCounters(), GROUP, PREFIX); } @Test @@ -180,14 +169,8 @@ void testSetJobLevelCountersUpdatesJobCounters() throws IOException, Interrupted new ReducerContextWrapper<>(reducer, conf, out); reducer.reduce(new Text("k"), Collections.singletonList(new Text("v")), wrapper.getContext()); - assertEquals(3, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_COUNT_TOTAL).getValue()); - assertEquals(600, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_SUM_MS).getValue()); - long p50 = wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P50_MS).getValue(); - long p95 = wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P95_MS).getValue(); - long p99 = wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P99_MS).getValue(); - assertTrue(p50 >= 100 && p50 <= 300); - assertTrue(p95 >= 100 && p95 <= 300); - assertTrue(p99 >= 100 && p99 <= 300); + LatencyTestUtil.assertCountAndSum(wrapper.getCounters(), GROUP, PREFIX, 3, 600); + LatencyTestUtil.assertPercentilesInRange(wrapper.getCounters(), GROUP, PREFIX, 100, 300); } @Test @@ -199,11 +182,7 @@ void testSetJobLevelCountersWithZeroCountSetsPercentilesToZero() throws IOExcept new ReducerContextWrapper<>(reducer, conf, out); reducer.reduce(new Text("k"), Collections.emptyList(), wrapper.getContext()); - assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_COUNT_TOTAL).getValue()); - assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_SUM_MS).getValue()); - assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P50_MS).getValue()); - assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P95_MS).getValue()); - assertEquals(0, wrapper.getCounters().findCounter(GROUP, PREFIX + LatencyTracker.SUFFIX_P99_MS).getValue()); + LatencyTestUtil.assertCountSumAndPercentilesZero(wrapper.getCounters(), GROUP, PREFIX); } /** Reducer that emits only count and sum via LatencyTracker (real Context, no mocks). */ diff --git a/src/test/org/apache/nutch/parse/TestParseSegment.java b/src/test/org/apache/nutch/parse/TestParseSegment.java index 9feb9fecfa..4b53f0f9c6 100644 --- a/src/test/org/apache/nutch/parse/TestParseSegment.java +++ b/src/test/org/apache/nutch/parse/TestParseSegment.java @@ -33,7 +33,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.nutch.metadata.Metadata; -import org.apache.nutch.metrics.LatencyTracker; +import org.apache.nutch.metrics.LatencyTestUtil; import org.apache.nutch.metrics.NutchMetrics; import org.apache.nutch.net.protocols.Response; import org.apache.nutch.protocol.Content; @@ -106,24 +106,14 @@ void testParseSegmentReducerLatencyKeySetsCounters() throws IOException, Interru ReducerContextWrapper wrapper = new ReducerContextWrapper<>(reducer, conf, out); - LatencyTracker tracker = new LatencyTracker(NutchMetrics.GROUP_PARSER, NutchMetrics.PARSER_LATENCY); - tracker.record(100); - tracker.record(200); - byte[] digestBytes = tracker.toBytes(); + byte[] digestBytes = LatencyTestUtil.createDigestBytes(100, 200); List values = new ArrayList<>(); values.add(new BytesWritable(digestBytes)); reducer.reduce(new Text(NutchMetrics.LATENCY_KEY), values, wrapper.getContext()); - long p50 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_PARSER, - NutchMetrics.PARSER_LATENCY + LatencyTracker.SUFFIX_P50_MS).getValue(); - long p95 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_PARSER, - NutchMetrics.PARSER_LATENCY + LatencyTracker.SUFFIX_P95_MS).getValue(); - long p99 = wrapper.getCounters().findCounter(NutchMetrics.GROUP_PARSER, - NutchMetrics.PARSER_LATENCY + LatencyTracker.SUFFIX_P99_MS).getValue(); - assertTrue(p50 >= 100 && p50 <= 200); - assertTrue(p95 >= 100 && p95 <= 200); - assertTrue(p99 >= 100 && p99 <= 200); + LatencyTestUtil.assertPercentilesInRange(wrapper.getCounters(), + NutchMetrics.GROUP_PARSER, NutchMetrics.PARSER_LATENCY, 100, 200); assertEquals(0, out.size()); } From f1d5dc33b2c0c31137268a15f05a832d675e0ff5 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Wed, 8 Apr 2026 21:48:39 +0200 Subject: [PATCH 5/6] NUTCH-3162 Latency metrics to properly merge data from all threads and tasks Wrap ParseImpl and BytesWritable holding the serialized T-Digest into a NutchWritable object. --- .../org/apache/nutch/parse/ParseSegment.java | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/src/java/org/apache/nutch/parse/ParseSegment.java b/src/java/org/apache/nutch/parse/ParseSegment.java index bf5c00afda..318eca29ce 100644 --- a/src/java/org/apache/nutch/parse/ParseSegment.java +++ b/src/java/org/apache/nutch/parse/ParseSegment.java @@ -20,6 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.NutchWritable; import org.apache.nutch.crawl.SignatureFactory; import org.apache.nutch.segment.SegmentChecker; import org.apache.nutch.util.NutchConfiguration; @@ -48,7 +49,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Partitioner; @@ -79,7 +79,7 @@ public ParseSegment(Configuration conf) { } public static class ParseSegmentMapper extends - Mapper, Content, Text, Writable> { + Mapper, Content, Text, NutchWritable> { private ParseUtil parseUtil; private Text newKey = new Text(); @@ -89,7 +89,7 @@ public static class ParseSegmentMapper extends private ErrorTracker errorTracker; @Override - public void setup(Mapper, Content, Text, Writable>.Context context) { + public void setup(Mapper, Content, Text, NutchWritable>.Context context) { Configuration conf = context.getConfiguration(); scfilters = new ScoringFilters(conf); skipTruncated = conf.getBoolean(SKIP_TRUNCATED, true); @@ -100,18 +100,19 @@ public void setup(Mapper, Content, Text, Writable>.Context } @Override - public void cleanup(Mapper, Content, Text, Writable>.Context context) + public void cleanup(Mapper, Content, Text, NutchWritable>.Context context) throws IOException, InterruptedException { parseLatencyTracker.emitCountAndSumOnly(context); byte[] digestBytes = parseLatencyTracker.toBytes(); if (digestBytes.length > 0) { - context.write(new Text(NutchMetrics.LATENCY_KEY), new BytesWritable(digestBytes)); + context.write(new Text(NutchMetrics.LATENCY_KEY), + new NutchWritable(new BytesWritable(digestBytes))); } } @Override public void map(WritableComparable key, Content content, - Mapper, Content, Text, Writable>.Context context) + Mapper, Content, Text, NutchWritable>.Context context) throws IOException, InterruptedException { // convert on the fly from old UTF8 keys if (key instanceof Text) { @@ -184,8 +185,8 @@ public void map(WritableComparable key, Content content, context.write( url, - new ParseImpl(new ParseText(parse.getText()), parse.getData(), parse - .isCanonical())); + new NutchWritable(new ParseImpl(new ParseText(parse.getText()), + parse.getData(), parse.isCanonical()))); } } } @@ -241,9 +242,9 @@ public static boolean isTruncated(Content content) { } /** Sends LATENCY_KEY to partition 0 so one reducer merges all TDigests. */ - public static class ParseSegmentPartitioner extends Partitioner { + public static class ParseSegmentPartitioner extends Partitioner { @Override - public int getPartition(Text key, Writable value, int numPartitions) { + public int getPartition(Text key, NutchWritable value, int numPartitions) { if (numPartitions <= 1) { return 0; } @@ -255,19 +256,19 @@ public int getPartition(Text key, Writable value, int numPartitions) { } public static class ParseSegmentReducer extends - Reducer { + Reducer { private static final Text LATENCY_KEY = new Text(NutchMetrics.LATENCY_KEY); @Override - public void reduce(Text key, Iterable values, - Reducer.Context context) + public void reduce(Text key, Iterable values, + Reducer.Context context) throws IOException, InterruptedException { if (key.equals(LATENCY_KEY)) { com.tdunning.math.stats.MergingDigest merged = null; - for (Writable w : values) { - if (w instanceof BytesWritable) { - byte[] bytes = ((BytesWritable) w).copyBytes(); + for (NutchWritable w : values) { + if (w.get() instanceof BytesWritable) { + byte[] bytes = ((BytesWritable) w.get()).copyBytes(); if (bytes != null && bytes.length > 0) { com.tdunning.math.stats.MergingDigest d = LatencyTracker.fromBytes(bytes); if (d != null) { @@ -290,9 +291,9 @@ public void reduce(Text key, Iterable values, } return; } - Iterator valuesIter = values.iterator(); + Iterator valuesIter = values.iterator(); if (valuesIter.hasNext()) { - context.write(key, (ParseImpl) valuesIter.next()); + context.write(key, (ParseImpl) valuesIter.next().get()); } } } @@ -319,7 +320,7 @@ public void parse(Path segment) throws IOException, job.setMapperClass(ParseSegment.ParseSegmentMapper.class); job.setReducerClass(ParseSegment.ParseSegmentReducer.class); job.setPartitionerClass(ParseSegment.ParseSegmentPartitioner.class); - job.setMapOutputValueClass(Writable.class); + job.setMapOutputValueClass(NutchWritable.class); FileOutputFormat.setOutputPath(job, segment); job.setOutputFormatClass(ParseOutputFormat.class); From f11a945e1f22ba526c43925bfe705b848fedff4d Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Thu, 9 Apr 2026 12:02:53 +0200 Subject: [PATCH 6/6] NUTCH-3162 Latency metrics to properly merge data from all threads and tasks Fix unit test. --- .../apache/nutch/parse/TestParseSegment.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/test/org/apache/nutch/parse/TestParseSegment.java b/src/test/org/apache/nutch/parse/TestParseSegment.java index 4b53f0f9c6..e166bc8912 100644 --- a/src/test/org/apache/nutch/parse/TestParseSegment.java +++ b/src/test/org/apache/nutch/parse/TestParseSegment.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,7 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; +import org.apache.nutch.crawl.NutchWritable; import org.apache.nutch.metadata.Metadata; import org.apache.nutch.metrics.LatencyTestUtil; import org.apache.nutch.metrics.NutchMetrics; @@ -103,14 +102,15 @@ void testParseSegmentReducerLatencyKeySetsCounters() throws IOException, Interru Configuration conf = NutchConfiguration.create(); Map out = new HashMap<>(); ParseSegment.ParseSegmentReducer reducer = new ParseSegment.ParseSegmentReducer(); - ReducerContextWrapper wrapper = - new ReducerContextWrapper<>(reducer, conf, out); + ReducerContextWrapper wrapper = new ReducerContextWrapper<>( + reducer, conf, out); byte[] digestBytes = LatencyTestUtil.createDigestBytes(100, 200); - List values = new ArrayList<>(); - values.add(new BytesWritable(digestBytes)); + List values = new ArrayList<>(); + values.add(new NutchWritable(new BytesWritable(digestBytes))); - reducer.reduce(new Text(NutchMetrics.LATENCY_KEY), values, wrapper.getContext()); + reducer.reduce(new Text(NutchMetrics.LATENCY_KEY), values, + wrapper.getContext()); LatencyTestUtil.assertPercentilesInRange(wrapper.getCounters(), NutchMetrics.GROUP_PARSER, NutchMetrics.PARSER_LATENCY, 100, 200); @@ -125,13 +125,16 @@ void testParseSegmentReducerLatencyKeySetsCounters() throws IOException, Interru void testParseSegmentPartitionerSendsLatencyKeyToPartitionZero() { ParseSegment.ParseSegmentPartitioner partitioner = new ParseSegment.ParseSegmentPartitioner(); int numPartitions = 4; - assertEquals(0, partitioner.getPartition(new Text(NutchMetrics.LATENCY_KEY), new BytesWritable(), numPartitions)); + assertEquals(0, partitioner.getPartition(new Text(NutchMetrics.LATENCY_KEY), + new NutchWritable(new BytesWritable()), numPartitions)); } @Test void testParseSegmentPartitionerWithSinglePartition() { ParseSegment.ParseSegmentPartitioner partitioner = new ParseSegment.ParseSegmentPartitioner(); - assertEquals(0, partitioner.getPartition(new Text(NutchMetrics.LATENCY_KEY), new BytesWritable(), 1)); - assertEquals(0, partitioner.getPartition(new Text("http://example.com/"), new BytesWritable(), 1)); + assertEquals(0, partitioner.getPartition(new Text(NutchMetrics.LATENCY_KEY), + new NutchWritable(new BytesWritable()), 1)); + assertEquals(0, partitioner.getPartition(new Text("http://example.com/"), + new NutchWritable(new BytesWritable()), 1)); } }