diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java index 84d4f1ba428..432cf9189d1 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java @@ -238,7 +238,7 @@ public void processLogFile(FileSystem fs, Path filePath) throws IOException { if (!logFileReaderOptional.isPresent()) { // This is an empty file, assume processed successfully and return - LOG.warn("Found empty file to process {}", filePath); + LOG.info("Ignoring zero length replication log file {}", filePath); return; } @@ -303,32 +303,35 @@ protected Optional createLogFileReader(FileSystem fs, Path filePa LogFileReader logFileReader = new LogFileReader(); LogFileReaderContext logFileReaderContext = new LogFileReaderContext(conf).setFileSystem(fs).setFilePath(filePath); - boolean isClosed = isFileClosed(fs, filePath); - if (isClosed) { - // As file is closed, ensure that the file has a valid header and trailer - logFileReader.init(logFileReaderContext); - return Optional.of(logFileReader); - } else { - LOG.warn("Found un-closed file {}. Starting lease recovery.", filePath); + try { + // Ensure to recover lease first, in case file was un-closed. If it was already closed, + // recoverLease would return true immediately. recoverLease(fs, filePath); - if (fs.getFileStatus(filePath).getLen() <= 0) { - // Found empty file, returning null LogReader + if (fs.getFileStatus(filePath).getLen() > 0) { + try { + // Acquired the lease, try to create reader with validation both header and trailer + logFileReader.init(logFileReaderContext); + return Optional.of(logFileReader); + } catch (InvalidLogTrailerException invalidLogTrailerException) { + // If trailer is missing or corrupt, create a new reader without trailer validation. + // We must create a new instance because close() sets the closed flag, making + // the old instance unusable for reading. + LOG.warn("Invalid Trailer for file {}", filePath, invalidLogTrailerException); + closeReader(logFileReader); + logFileReaderContext.setValidateTrailer(false); + logFileReader = new LogFileReader(); + logFileReader.init(logFileReaderContext); + return Optional.of(logFileReader); + } + } else { + // Ignore the file and returning empty LogReader. return Optional.empty(); } - try { - // Acquired the lease, try to create reader with validation both header and trailer - logFileReader.init(logFileReaderContext); - return Optional.of(logFileReader); - } catch (InvalidLogTrailerException invalidLogTrailerException) { - // If trailer is missing or corrupt, create reader without trailer validation - LOG.warn("Invalid Trailer for file {}", filePath, invalidLogTrailerException); - logFileReaderContext.setValidateTrailer(false); - logFileReader.init(logFileReaderContext); - return Optional.of(logFileReader); - } catch (IOException exception) { - LOG.error("Failed to initialize new LogFileReader for path {}", filePath, exception); - throw exception; - } + } catch (IOException exception) { + LOG.error("Failed to initialize new LogFileReader for path {}", filePath, exception); + // close the reader to avoid leaking socket connection + closeReader(logFileReader); + throw exception; } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java index 8a985f3ead1..e486f82935a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java @@ -69,6 +69,7 @@ import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.replication.log.LogFile; import org.apache.phoenix.replication.log.LogFileReader; import org.apache.phoenix.replication.log.LogFileReaderContext; import org.apache.phoenix.replication.log.LogFileTestUtil; @@ -182,27 +183,118 @@ public void testCreateLogFileReaderWithNonExistentFile() throws IOException { } /** - * Tests error handling when attempting to create LogFileReader with an invalid/corrupted file. + * Tests that createLogFileReader returns empty for a zero-byte file. */ @Test - public void testCreateLogFileReaderWithInvalidLogFile() throws IOException { + public void testCreateLogFileReaderWithEmptyLogFile() throws IOException { Path invalidFilePath = new Path(testFolder.newFile("invalid_file").toURI()); localFs.create(invalidFilePath).close(); // Create empty file ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, testHAGroupName); try { - replicationLogProcessor.createLogFileReader(localFs, invalidFilePath); - fail("Should throw IOException for invalid file"); - } catch (IOException e) { - // Should throw some kind of IOException when trying to read header - assertTrue("Should throw IOException", true); + Optional optionalLogFileReader = + replicationLogProcessor.createLogFileReader(localFs, invalidFilePath); + assertFalse("Reader should not be present for empty file", optionalLogFileReader.isPresent()); } finally { - // Delete the invalid file localFs.delete(invalidFilePath); replicationLogProcessor.close(); } } + /** + * Tests that createLogFileReader handles InvalidLogTrailerException by closing the old reader and + * returning a new usable reader. Simulates a writer crash after sync but before close, resulting + * in a file with valid header and data but no trailer. + */ + @Test + public void testCreateLogFileReaderWithMissingTrailer() throws IOException { + Path filePath = new Path(testFolder.newFile("missing_trailer_file").toURI()); + String tableName = "T_" + generateUniqueName(); + + LogFileWriter writer = initLogFileWriter(filePath); + Mutation put = LogFileTestUtil.newPut("testRow", 1, 1); + writer.append(tableName, 1, put); + writer.sync(); + // Do NOT call writer.close() -- skips trailer, simulates a writer crash after sync + + assertTrue("File should exist", localFs.exists(filePath)); + assertTrue("File should be non-empty", localFs.getFileStatus(filePath).getLen() > 0); + + ReplicationLogProcessor spyProcessor = + Mockito.spy(new ReplicationLogProcessor(conf, testHAGroupName)); + try { + Optional optionalReader = spyProcessor.createLogFileReader(localFs, filePath); + assertTrue("Reader should be present for file with missing trailer", + optionalReader.isPresent()); + + // Verify the old reader was closed during InvalidLogTrailerException handling + Mockito.verify(spyProcessor, Mockito.times(1)).closeReader(Mockito.any(LogFileReader.class)); + + // Verify the returned reader is functional (not stuck with closed=true) + LogFileReader reader = optionalReader.get(); + Iterator iterator = reader.iterator(); + assertTrue("Reader should be able to read records", iterator.hasNext()); + + LogFile.Record record = iterator.next(); + assertNotNull("Record should not be null", record); + assertEquals("Table name should match", tableName, record.getHBaseTableName()); + + assertFalse("Should have no more records after the single appended record", + iterator.hasNext()); + reader.close(); + } finally { + localFs.delete(filePath); + spyProcessor.close(); + } + } + + /** + * Tests that when the first init() throws InvalidLogTrailerException and the second init() + * (without trailer validation) throws IOException, closeReader is called for both reader + * instances and the IOException propagates to the caller. + */ + @Test + public void testCreateLogFileReaderWithTrailerExceptionThenIOException() throws IOException { + // Create a file with valid header + data but no trailer to trigger InvalidLogTrailerException + Path filePath = new Path(testFolder.newFile("trailer_then_io_failure").toURI()); + String tableName = "T_" + generateUniqueName(); + + LogFileWriter writer = initLogFileWriter(filePath); + Mutation put = LogFileTestUtil.newPut("testRow", 1, 1); + writer.append(tableName, 1, put); + writer.sync(); + // Do NOT call writer.close() -- skips trailer + + assertTrue("File should be non-empty", localFs.getFileStatus(filePath).getLen() > 0); + + ReplicationLogProcessor spyProcessor = + Mockito.spy(new ReplicationLogProcessor(conf, testHAGroupName)); + + // After closeReader is called for the first reader (InvalidLogTrailerException), + // corrupt the file with garbage so the second init() fails with IOException + Mockito.doAnswer(invocation -> { + invocation.callRealMethod(); + org.apache.hadoop.fs.FSDataOutputStream out = localFs.create(filePath, true); + out.write("garbage data that is not a valid log file header".getBytes()); + out.close(); + return null; + }).doCallRealMethod().when(spyProcessor).closeReader(Mockito.any(LogFileReader.class)); + + try { + spyProcessor.createLogFileReader(localFs, filePath); + fail("Should throw IOException when second init() fails"); + } catch (IOException e) { + // Expected: second init() fails because the file content is now corrupted + } finally { + localFs.delete(filePath); + spyProcessor.close(); + } + + // closeReader called twice: once for old reader (InvalidLogTrailerException), + // once for new reader (IOException from outer catch) + Mockito.verify(spyProcessor, Mockito.times(2)).closeReader(Mockito.any(LogFileReader.class)); + } + /** * Tests the closeReader method with both null and valid LogFileReader instances. */