Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.phoenix.filter.RowKeyComparisonFilter.RowKeyTuple;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.CompiledConditionalTTLExpression;
Expand Down Expand Up @@ -144,13 +145,13 @@ public class CompactionScanner implements InternalScanner {
private final long maxLookbackInMillis;
private int minVersion;
private int maxVersion;
private final boolean emptyCFStore;
private boolean emptyCFStore;
private final boolean localIndex;
private final int familyCount;
private KeepDeletedCells keepDeletedCells;
private long compactionTime;
private final byte[] emptyCF;
private final byte[] emptyCQ;
private byte[] emptyCF;
private byte[] emptyCQ;
private final byte[] storeColumnFamily;
private final String tableName;
private final String columnFamilyName;
Expand All @@ -162,6 +163,7 @@ public class CompactionScanner implements InternalScanner {
private long outputCellCount = 0;
private boolean phoenixLevelOnly = false;
private boolean isCDCIndex;
private boolean isEmptyCfCqInitialized;

// Only for forcing minor compaction while testing
private static boolean forceMinorCompaction = false;
Expand All @@ -180,13 +182,15 @@ public CompactionScanner(RegionCoprocessorEnvironment env,
// Empty column family and qualifier are always needed to compute which all empty cells to retain
// even during minor compactions. If required empty cells are not retained during
// minor compactions then we can run into the risk of partial row expiry on next major compaction.
this.emptyCF = SchemaUtil.getEmptyColumnFamily(table);
this.emptyCQ = SchemaUtil.getEmptyColumnQualifier(table);
this.emptyCF = table != null ? SchemaUtil.getEmptyColumnFamily(table) : null;
this.emptyCQ = table != null ? SchemaUtil.getEmptyColumnQualifier(table) : null;
// PTable will be null only for flushes
this.isEmptyCfCqInitialized = table != null;
compactionTime = EnvironmentEdgeManager.currentTimeMillis();
columnFamilyName = store.getColumnFamilyName();
storeColumnFamily = columnFamilyName.getBytes();
tableName = region.getRegionInfo().getTable().getNameAsString();
String dataTableName = table.getName().toString();
String dataTableName = table != null ? table.getName().toString() : "";
Long overriddenMaxLookback = maxLookbackMap.get(tableName + SEPARATOR + columnFamilyName);
this.maxLookbackInMillis = overriddenMaxLookback == null ?
maxLookbackAgeInMillis : Math.max(maxLookbackAgeInMillis, overriddenMaxLookback);
Expand Down Expand Up @@ -416,11 +420,41 @@ private void postProcessForConditionalTTL(List<Cell> result) {
}
}

private void determineEmptyCfCq(List<Cell> result) {
// This should be called only per instance
assert ! isEmptyCfCqInitialized;
byte[] emptyCF = null;
for (Cell cell : result) {
emptyCF = CellUtil.cloneFamily(cell);
if (ScanUtil.isEmptyColumn(cell, emptyCF, QueryConstants.EMPTY_COLUMN_BYTES)) {
emptyCQ = QueryConstants.EMPTY_COLUMN_BYTES;
break;
} else if (ScanUtil.isEmptyColumn(cell, emptyCF,
QueryConstants.ENCODED_EMPTY_COLUMN_BYTES)) {
//Empty column is always encoded in FOUR_BYTE format, since it's a reserved
// qualifier. See EncodedColumnsUtil#isReservedColumnQualifier.
emptyCQ = QueryConstants.ENCODED_EMPTY_COLUMN_BYTES;
break;
}
}
if (emptyCQ == QueryConstants.EMPTY_COLUMN_BYTES
|| emptyCQ == QueryConstants.ENCODED_EMPTY_COLUMN_BYTES) {
this.emptyCF = emptyCF;
this.emptyCFStore = true;
}
this.isEmptyCfCqInitialized = true;
}

@Override
public boolean next(List<Cell> result) throws IOException {
boolean hasMore = storeScanner.next(result);
inputCellCount += result.size();
if (!result.isEmpty()) {
// This will happen only during flushes as then we don't pass PTable object
// to determine emptyCF and emptyCQ
if (!this.isEmptyCfCqInitialized) {
determineEmptyCfCq(result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For not emptycfstore aren't we doing this check on every row

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, will fix that. Thanks

}
// This is for debugging
// printRow(result, "Input for " + tableName + " " + columnFamilyName, true, false);
phoenixLevelRowCompactor.compact(result, false);
Expand Down Expand Up @@ -2459,7 +2493,8 @@ private void getLastRowVersionInMaxLookbackWindow(List<Cell> result,
}
if (cell.getType() == Cell.Type.Put) {
lastRowVersion.add(cell);
if (ScanUtil.isEmptyColumn(cell, emptyCF, emptyCQ)) {
if (emptyCF != null && emptyCQ != null
&& ScanUtil.isEmptyColumn(cell, emptyCF, emptyCQ)) {
index = addEmptyColumn(result, currentColumnCell, index, emptyColumn);
} else {
index = skipColumn(result, currentColumnCell, retainedCells, index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.Region;
Expand Down Expand Up @@ -611,6 +612,29 @@ private boolean areMutationsInSameTable(Table targetHTable, Region region) {
region.getTableDescriptor().getTableName().getName()) == 0);
}

@Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, FlushLifeCycleTracker tracker)
throws IOException {
if (!isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using the same config to control both flushing and compaction. Will there be a scenario where we want to disable Phoenix compaction on flushing but still continue to use Phoenix compaction for major/minor compaction ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be a scenario where we want to disable Phoenix compaction on flushing but still continue to use Phoenix compaction for major/minor compaction ?

I think in general it will be good to have this flexibility. Shall I introduce a new config to enable disable preFlush hook separately?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think it will be better because Phoenix compaction has already been running in production and this is a new feature so having this flexibility will be helpful. We can enable it by default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flexibility will be helpful. I think we can go ahead with merging the changes after config is added and perf can be done later. The config will anyways be helpful to turn off the feature, WDYT @tkhurana?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are fine with waiting for a day or 2 more then I can post the perf results. For single column family its done. I am currently, doing perf analysis of multi-CF. Thanks

Copy link
Contributor Author

@sanjeet006py sanjeet006py May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@virajjasani perf analysis for multi-CF will take some time as I got to know that HBase flushTime metrics for multi-CF case sometimes could end up tracking combined time for flushing multiple CFs and other times only 1 CF. So, need to directly track time taken by StoreFlusher.
One idea is we wait for perf analysis before merging this PR but 5.3 release can go on or if we want this PR in 5.3 then for now we can disable preFlush hook via config and only enable it after perf analysis. I am bit inclined towards first approach. WDYT @virajjasani @tkhurana?

Copy link
Contributor

@tkhurana tkhurana May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sanjeet006py Instead of relying on metrics you could also use the log messages when flush happens. For example 2025-05-07 16:47:34,580 INFO [MemStoreFlusher.1] regionserver.HRegion - Finished flush of dataSize ~255.34 MB/267746710, heapSize ~256.01 MB/268445696, currentSize=10.04 MB/10529365 for 397f5412f43294d01081c54d7253d378 in 1986ms, sequenceid=207416387, compaction requested=false" Does that have the same problem too ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we don't need absolute numbers but just a comparison to make sure nothing is regressed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that have the same problem too ?

Yes as this log line only tells us how much time all the stores which were to be flushed took to flush data as total. I added a log line in StoreFlusher and will test with that now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the config to disable CompactionScanner for flushes.

return scanner;
} else {
return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
@Override public InternalScanner run() throws Exception {
String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable()
.getNameAsString();
Configuration conf = c.getEnvironment().getConfiguration();
long maxLookbackInMillis =
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf);
maxLookbackInMillis = CompactionScanner.getMaxLookbackInMillis(tableName,
store.getColumnFamilyName(), maxLookbackInMillis);
return new CompactionScanner(c.getEnvironment(), store, scanner,
maxLookbackInMillis, false, true, null);
}
});
}
}

@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,67 @@ public void testRetainingLastRowVersion() throws Exception {
TestUtil.dumpTable(conn, dataTableName);
ResultSet rs = stmt.executeQuery("select * from " + dataTableName + " where id = 'a'");
while(rs.next()) {
assertNotNull(rs.getString(3));
assertNotNull(rs.getString(4));
assertEquals("abc", rs.getString(3));
assertEquals("abcd", rs.getString(4));
}
}
}

@Test(timeout=60000)
public void testRetainingLastRowVersionForFlushes() throws Exception {
try(Connection conn = DriverManager.getConnection(getUrl())) {
String tableName = generateUniqueName();
createTable(tableName);
long timeIntervalBetweenTwoUpserts = (ttl / 2) + 1;
injectEdge.setValue(System.currentTimeMillis());
EnvironmentEdgeManager.injectEdge(injectEdge);
TableName dataTableName = TableName.valueOf(tableName);
injectEdge.incrementValue(1);
Statement stmt = conn.createStatement();
stmt.execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')");
conn.commit();
injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
stmt.execute("upsert into " + tableName + " values ('a', 'ab1')");
conn.commit();
injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
stmt.execute("upsert into " + tableName + " values ('a', 'ab2')");
conn.commit();
injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
stmt.execute("upsert into " + tableName + " values ('a', 'ab3')");
conn.commit();
injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);

TestUtil.dumpTable(conn, dataTableName);
byte[] rowKey = Bytes.toBytes("a");
int rawCellCount = TestUtil.getRawCellCount(conn, dataTableName, rowKey);
// 6 non-empty cells (ab3, ab2, ab1, ab, abc, abcd) + 4 empty cells (for 4 upserts)
assertEquals(10, rawCellCount);

TestUtil.flush(utility, dataTableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use TestUtil.getRawCellCount and verify that extra row versions are removed.

injectEdge.incrementValue(1);
rawCellCount = TestUtil.getRawCellCount(conn, dataTableName, rowKey);
// 1 non-empty cell (ab3) and 1 empty cell in max lookback window are
// immediately retained.
// 3 non-empty cells outside max lookback window will be retained (ab2, abc, abcd)
// 3 (for multi-CF)/ 2 (single-CF) empty cells will be retained outside
// max lookback window
assertEquals(multiCF ? 8 : 7, rawCellCount);
TestUtil.dumpTable(conn, dataTableName);

majorCompact(dataTableName);
injectEdge.incrementValue(1);
rawCellCount = TestUtil.getRawCellCount(conn, dataTableName, rowKey);
// 1 non-empty cell (ab3) and 1 empty cell at the edge of max lookback window will be
// retained
// 2 non-empty cells outside max lookback window will be retained (abc, abcd)
// 2 empty cells will be retained outside max lookback window
assertEquals(6, rawCellCount);
TestUtil.dumpTable(conn, dataTableName);

ResultSet rs = stmt.executeQuery("select * from " + dataTableName + " where id = 'a'");
while(rs.next()) {
assertEquals("abc", rs.getString(3));
assertEquals("abcd", rs.getString(4));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ public static synchronized Collection<Object[]> data() {
{ false, false, KeepDeletedCells.FALSE, 1, 100, null},
{ false, false, KeepDeletedCells.TRUE, 5, 50, null},
{ false, false, KeepDeletedCells.TTL, 1, 25, null},
{ true, false, KeepDeletedCells.FALSE, 5, 50, null},
{ true, false, KeepDeletedCells.TRUE, 1, 25, null},
{ true, false, KeepDeletedCells.FALSE, 5, 50, 0},
{ true, false, KeepDeletedCells.TRUE, 1, 25, 0},
{ true, false, KeepDeletedCells.TTL, 5, 100, null},
{ false, false, KeepDeletedCells.FALSE, 1, 100, 0},
{ false, false, KeepDeletedCells.TRUE, 5, 50, 0},
Expand Down Expand Up @@ -246,7 +246,7 @@ public void testMaskingAndMajorCompaction() throws Exception {
}

@Test
public void testMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled()
public void testMinorCompactionAndFlushesShouldNotRetainCellsWhenMaxLookbackIsDisabled()
throws Exception {
if (tableLevelMaxLookback == null || tableLevelMaxLookback != 0) {
return;
Expand All @@ -273,15 +273,31 @@ public void testMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled()
Thread.sleep(1);
}
flush(TableName.valueOf(tableName));
// Flushes dump and retain all the cells to HFile.
// Doing MAX_COLUMN_INDEX + 1 to account for empty cells
assertEquals(TestUtil.getRawCellCount(conn, TableName.valueOf(tableName), row),
rowUpdateCounter * (MAX_COLUMN_INDEX + 1));
// At every flush, extra cell versions should be removed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to rename this test method to testMinorCompactionAndFlushShouldNotRetainCellsWhenMaxLookbackIsDisabled

// MAX_COLUMN_INDEX table columns will be retained for each row version.
int expectedMaxRawCellCount;
if (multiCF) {
// All empty cells are retained for multiCF tables for flushes and minor
// compactions
expectedMaxRawCellCount =
((i + 1) * MAX_COLUMN_INDEX * versions) + rowUpdateCounter;
}
else {
// Only empty column is retained for each row version
expectedMaxRawCellCount = (i + 1) * (MAX_COLUMN_INDEX + 1) * versions;
}
int rawCellCount = TestUtil.getRawCellCount(
conn, TableName.valueOf(tableName), row);
// Need inequality check here as a minor compaction could have happened
assertTrue(rawCellCount <= expectedMaxRawCellCount);
}
// Run one minor compaction (in case no minor compaction has happened yet)
TestUtil.minorCompact(utility, TableName.valueOf(tableName));
assertEquals(TestUtil.getRawCellCount(conn, TableName.valueOf(tableName),
Bytes.toBytes("a")), (MAX_COLUMN_INDEX + 1) * versions);
int rawCellCount = TestUtil.getRawCellCount(conn, TableName.valueOf(tableName),
Bytes.toBytes("a"));
int expectedRawCellCount = (MAX_COLUMN_INDEX * versions)
+ (multiCF ? rowUpdateCounter : versions);
assertEquals(expectedRawCellCount, rawCellCount);
} catch (AssertionError e) {
TestUtil.dumpTable(conn, TableName.valueOf(tableName));
throw e;
Expand Down