From 267ba8b6a53b6635b5d3ec651a011d177dcebdd2 Mon Sep 17 00:00:00 2001 From: "Md. Mosaddek Ali" Date: Sun, 8 Mar 2026 01:45:19 +0600 Subject: [PATCH 1/3] NUTCH-1446 Port NUTCH-1444 to trunk (Indexing should not create temporary files) --- .../nutch/indexer/IndexerOutputFormat.java | 74 +++++++++++++------ 1 file changed, 51 insertions(+), 23 deletions(-) diff --git a/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java b/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java index 9bb3b6fdad..8166c73d48 100644 --- a/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java +++ b/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java @@ -1,37 +1,30 @@ /* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. */ + package org.apache.nutch.indexer; import java.io.IOException; import java.lang.invoke.MethodHandles; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class IndexerOutputFormat - extends FileOutputFormat { +public class IndexerOutputFormat extends OutputFormat { - private static final Logger LOG = LoggerFactory - .getLogger(MethodHandles.lookup().lookupClass()); + private static final Logger LOG = + LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @Override public RecordWriter getRecordWriter( @@ -40,32 +33,67 @@ public RecordWriter getRecordWriter( Configuration conf = context.getConfiguration(); final IndexWriters writers = IndexWriters.get(conf); - String name = getUniqueFile(context, "part", ""); - writers.open(conf, name); + // open writers (no temporary file output anymore) + writers.open(conf, "index"); LOG.info(writers.describe()); return new RecordWriter() { @Override public void close(TaskAttemptContext context) throws IOException { - // do the commits once and for all the reducers in one go - boolean noCommit = conf - .getBoolean(IndexerMapReduce.INDEXER_NO_COMMIT, false); + + boolean noCommit = + conf.getBoolean(IndexerMapReduce.INDEXER_NO_COMMIT, false); + if (!noCommit) { writers.commit(); } + writers.close(); } @Override public void write(Text key, NutchIndexAction indexAction) throws IOException { + if (indexAction.action == NutchIndexAction.ADD) { writers.write(indexAction.doc); + } else if (indexAction.action == NutchIndexAction.DELETE) { writers.delete(key.toString()); } } }; } -} + + @Override + public void checkOutputSpecs(JobContext context) + throws IOException, InterruptedException { + // No output specs required since we don't write files + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + + return new OutputCommitter() { + + @Override + public void setupJob(JobContext jobContext) {} + + @Override + public void setupTask(TaskAttemptContext taskContext) {} + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) { + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskContext) {} + + @Override + public void abortTask(TaskAttemptContext taskContext) {} + }; + } +} \ No newline at end of file From 88bb55738621eb755e652f5922ccf83c707c2a21 Mon Sep 17 00:00:00 2001 From: "Md. Mosaddek Ali" Date: Mon, 9 Mar 2026 03:23:27 +0600 Subject: [PATCH 2/3] NUTCH-1446 Address review comments (unique index name) --- .../apache/nutch/indexer/IndexerOutputFormat.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java b/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java index 8166c73d48..63c3e8c0dc 100644 --- a/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java +++ b/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java @@ -1,8 +1,18 @@ /* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.nutch.indexer; @@ -34,7 +44,8 @@ public RecordWriter getRecordWriter( final IndexWriters writers = IndexWriters.get(conf); // open writers (no temporary file output anymore) - writers.open(conf, "index"); + String indexName = "index-" + context.getTaskAttemptID().toString(); + writers.open(conf, indexName); LOG.info(writers.describe()); return new RecordWriter() { From 7e33d78459828ba2c7c8d3d7cd5af0de04594d57 Mon Sep 17 00:00:00 2001 From: "Md. Mosaddek Ali" Date: Tue, 10 Mar 2026 01:46:41 +0600 Subject: [PATCH 3/3] Restore explanatory comment for noCommit logic --- src/java/org/apache/nutch/indexer/IndexerOutputFormat.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java b/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java index 63c3e8c0dc..5717e0ab09 100644 --- a/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java +++ b/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java @@ -53,8 +53,9 @@ public RecordWriter getRecordWriter( @Override public void close(TaskAttemptContext context) throws IOException { - boolean noCommit = - conf.getBoolean(IndexerMapReduce.INDEXER_NO_COMMIT, false); + // do the commits once and for all the reducers in one go + boolean noCommit = + conf.getBoolean(IndexerMapReduce.INDEXER_NO_COMMIT, false); if (!noCommit) { writers.commit();