Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void processLogFile(FileSystem fs, Path filePath) throws IOException {

if (!logFileReaderOptional.isPresent()) {
// This is an empty file, assume processed successfully and return
LOG.warn("Found empty file to process {}", filePath);
LOG.info("Ignoring zero length replication log file {}", filePath);
return;
}

Expand Down Expand Up @@ -303,32 +303,35 @@ protected Optional<LogFileReader> createLogFileReader(FileSystem fs, Path filePa
LogFileReader logFileReader = new LogFileReader();
LogFileReaderContext logFileReaderContext =
new LogFileReaderContext(conf).setFileSystem(fs).setFilePath(filePath);
boolean isClosed = isFileClosed(fs, filePath);
if (isClosed) {
// As file is closed, ensure that the file has a valid header and trailer
logFileReader.init(logFileReaderContext);
return Optional.of(logFileReader);
} else {
LOG.warn("Found un-closed file {}. Starting lease recovery.", filePath);
try {
// Ensure to recover lease first, in case file was un-closed. If it was already closed,
// recoverLease would return true immediately.
recoverLease(fs, filePath);
if (fs.getFileStatus(filePath).getLen() <= 0) {
// Found empty file, returning null LogReader
if (fs.getFileStatus(filePath).getLen() > 0) {
try {
// Acquired the lease, try to create reader with validation both header and trailer
logFileReader.init(logFileReaderContext);
return Optional.of(logFileReader);
} catch (InvalidLogTrailerException invalidLogTrailerException) {
// If trailer is missing or corrupt, create a new reader without trailer validation.
// We must create a new instance because close() sets the closed flag, making
// the old instance unusable for reading.
LOG.warn("Invalid Trailer for file {}", filePath, invalidLogTrailerException);
closeReader(logFileReader);
logFileReaderContext.setValidateTrailer(false);
logFileReader = new LogFileReader();
logFileReader.init(logFileReaderContext);
return Optional.of(logFileReader);
}
} else {
// Ignore the file and returning empty LogReader.
return Optional.empty();
}
try {
// Acquired the lease, try to create reader with validation both header and trailer
logFileReader.init(logFileReaderContext);
return Optional.of(logFileReader);
} catch (InvalidLogTrailerException invalidLogTrailerException) {
// If trailer is missing or corrupt, create reader without trailer validation
LOG.warn("Invalid Trailer for file {}", filePath, invalidLogTrailerException);
logFileReaderContext.setValidateTrailer(false);
logFileReader.init(logFileReaderContext);
return Optional.of(logFileReader);
} catch (IOException exception) {
LOG.error("Failed to initialize new LogFileReader for path {}", filePath, exception);
throw exception;
}
} catch (IOException exception) {
LOG.error("Failed to initialize new LogFileReader for path {}", filePath, exception);
// close the reader to avoid leaking socket connection
closeReader(logFileReader);
throw exception;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.replication.log.LogFile;
import org.apache.phoenix.replication.log.LogFileReader;
import org.apache.phoenix.replication.log.LogFileReaderContext;
import org.apache.phoenix.replication.log.LogFileTestUtil;
Expand Down Expand Up @@ -182,27 +183,118 @@ public void testCreateLogFileReaderWithNonExistentFile() throws IOException {
}

/**
* Tests error handling when attempting to create LogFileReader with an invalid/corrupted file.
* Tests that createLogFileReader returns empty for a zero-byte file.
*/
@Test
public void testCreateLogFileReaderWithInvalidLogFile() throws IOException {
public void testCreateLogFileReaderWithEmptyLogFile() throws IOException {
Path invalidFilePath = new Path(testFolder.newFile("invalid_file").toURI());
localFs.create(invalidFilePath).close(); // Create empty file
ReplicationLogProcessor replicationLogProcessor =
new ReplicationLogProcessor(conf, testHAGroupName);
try {
replicationLogProcessor.createLogFileReader(localFs, invalidFilePath);
fail("Should throw IOException for invalid file");
} catch (IOException e) {
// Should throw some kind of IOException when trying to read header
assertTrue("Should throw IOException", true);
Optional<LogFileReader> optionalLogFileReader =
replicationLogProcessor.createLogFileReader(localFs, invalidFilePath);
assertFalse("Reader should not be present for empty file", optionalLogFileReader.isPresent());
} finally {
// Delete the invalid file
localFs.delete(invalidFilePath);
replicationLogProcessor.close();
}
}

/**
* Tests that createLogFileReader handles InvalidLogTrailerException by closing the old reader and
* returning a new usable reader. Simulates a writer crash after sync but before close, resulting
* in a file with valid header and data but no trailer.
*/
@Test
public void testCreateLogFileReaderWithMissingTrailer() throws IOException {
Path filePath = new Path(testFolder.newFile("missing_trailer_file").toURI());
String tableName = "T_" + generateUniqueName();

LogFileWriter writer = initLogFileWriter(filePath);
Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
writer.append(tableName, 1, put);
writer.sync();
// Do NOT call writer.close() -- skips trailer, simulates a writer crash after sync

assertTrue("File should exist", localFs.exists(filePath));
assertTrue("File should be non-empty", localFs.getFileStatus(filePath).getLen() > 0);

ReplicationLogProcessor spyProcessor =
Mockito.spy(new ReplicationLogProcessor(conf, testHAGroupName));
try {
Optional<LogFileReader> optionalReader = spyProcessor.createLogFileReader(localFs, filePath);
assertTrue("Reader should be present for file with missing trailer",
optionalReader.isPresent());

// Verify the old reader was closed during InvalidLogTrailerException handling
Mockito.verify(spyProcessor, Mockito.times(1)).closeReader(Mockito.any(LogFileReader.class));

// Verify the returned reader is functional (not stuck with closed=true)
LogFileReader reader = optionalReader.get();
Iterator<LogFile.Record> iterator = reader.iterator();
assertTrue("Reader should be able to read records", iterator.hasNext());

LogFile.Record record = iterator.next();
assertNotNull("Record should not be null", record);
assertEquals("Table name should match", tableName, record.getHBaseTableName());

assertFalse("Should have no more records after the single appended record",
iterator.hasNext());
reader.close();
} finally {
localFs.delete(filePath);
spyProcessor.close();
}
}

/**
* Tests that when the first init() throws InvalidLogTrailerException and the second init()
* (without trailer validation) throws IOException, closeReader is called for both reader
* instances and the IOException propagates to the caller.
*/
@Test
public void testCreateLogFileReaderWithTrailerExceptionThenIOException() throws IOException {
// Create a file with valid header + data but no trailer to trigger InvalidLogTrailerException
Path filePath = new Path(testFolder.newFile("trailer_then_io_failure").toURI());
String tableName = "T_" + generateUniqueName();

LogFileWriter writer = initLogFileWriter(filePath);
Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
writer.append(tableName, 1, put);
writer.sync();
// Do NOT call writer.close() -- skips trailer

assertTrue("File should be non-empty", localFs.getFileStatus(filePath).getLen() > 0);

ReplicationLogProcessor spyProcessor =
Mockito.spy(new ReplicationLogProcessor(conf, testHAGroupName));

// After closeReader is called for the first reader (InvalidLogTrailerException),
// corrupt the file with garbage so the second init() fails with IOException
Mockito.doAnswer(invocation -> {
invocation.callRealMethod();
org.apache.hadoop.fs.FSDataOutputStream out = localFs.create(filePath, true);
out.write("garbage data that is not a valid log file header".getBytes());
out.close();
return null;
}).doCallRealMethod().when(spyProcessor).closeReader(Mockito.any(LogFileReader.class));

try {
spyProcessor.createLogFileReader(localFs, filePath);
fail("Should throw IOException when second init() fails");
} catch (IOException e) {
// Expected: second init() fails because the file content is now corrupted
} finally {
localFs.delete(filePath);
spyProcessor.close();
}

// closeReader called twice: once for old reader (InvalidLogTrailerException),
// once for new reader (IOException from outer catch)
Mockito.verify(spyProcessor, Mockito.times(2)).closeReader(Mockito.any(LogFileReader.class));
}

/**
* Tests the closeReader method with both null and valid LogFileReader instances.
*/
Expand Down