Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.phoenix.filter.RowKeyComparisonFilter.RowKeyTuple;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.CompiledConditionalTTLExpression;
Expand Down Expand Up @@ -144,13 +145,13 @@ public class CompactionScanner implements InternalScanner {
private final long maxLookbackInMillis;
private int minVersion;
private int maxVersion;
private final boolean emptyCFStore;
private boolean emptyCFStore;
private final boolean localIndex;
private final int familyCount;
private KeepDeletedCells keepDeletedCells;
private long compactionTime;
private final byte[] emptyCF;
private final byte[] emptyCQ;
private byte[] emptyCF;
private byte[] emptyCQ;
private final byte[] storeColumnFamily;
private final String tableName;
private final String columnFamilyName;
Expand Down Expand Up @@ -180,13 +181,13 @@ public CompactionScanner(RegionCoprocessorEnvironment env,
// Empty column family and qualifier are always needed to compute which all empty cells to retain
// even during minor compactions. If required empty cells are not retained during
// minor compactions then we can run into the risk of partial row expiry on next major compaction.
this.emptyCF = SchemaUtil.getEmptyColumnFamily(table);
this.emptyCQ = SchemaUtil.getEmptyColumnQualifier(table);
this.emptyCF = table != null ? SchemaUtil.getEmptyColumnFamily(table) : EMPTY_BYTE_ARRAY;
this.emptyCQ = table != null ? SchemaUtil.getEmptyColumnQualifier(table) : EMPTY_BYTE_ARRAY;
Copy link
Contributor

@virajjasani virajjasani Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not keep emptyCF and emptyCQ as null if PTable is null, so that we can also incorporate this logic?

Instead of

                    if (ScanUtil.isEmptyColumn(cell, emptyCF, emptyCQ)) {
                        index = addEmptyColumn(result, currentColumnCell, index, emptyColumn);
                    } else {
                        index = skipColumn(result, currentColumnCell, retainedCells, index);
                    }

this

                    if (emptyCF != null && emptyCQ != null && ScanUtil.isEmptyColumn(cell, emptyCF,
                            emptyCQ)) {
                        index = addEmptyColumn(result, currentColumnCell, index, emptyColumn);
                    } else {
                        index = skipColumn(result, currentColumnCell, retainedCells, index);
                    }

and similarly, if (emptyCQ == EMPTY_BYTE_ARRAY) too will be simple null check.

I don't think EMPTY_BYTE_ARRAY is allowed as CF:CQ, but while debugging, null check will be more readable rather than using incorrect values of emptyCF and emptyCQ for ScanUtil.isEmptyColumn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By keeping emptyCF and emptyCQ values as null are we trying to optimize the if check? I actually kept it empty byte array to avoid null handling and nothing will match empty byte array.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think EMPTY_BYTE_ARRAY is allowed as CF:CQ, but while debugging, null check will be more readable rather than using incorrect values of emptyCF and emptyCQ for ScanUtil.isEmptyColumn?

Got it, will change to storing null values. I agree this improves readability. Thanks

compactionTime = EnvironmentEdgeManager.currentTimeMillis();
columnFamilyName = store.getColumnFamilyName();
storeColumnFamily = columnFamilyName.getBytes();
tableName = region.getRegionInfo().getTable().getNameAsString();
String dataTableName = table.getName().toString();
String dataTableName = table != null ? table.getName().toString() : "";
Long overriddenMaxLookback = maxLookbackMap.get(tableName + SEPARATOR + columnFamilyName);
this.maxLookbackInMillis = overriddenMaxLookback == null ?
maxLookbackAgeInMillis : Math.max(maxLookbackAgeInMillis, overriddenMaxLookback);
Expand Down Expand Up @@ -416,11 +417,32 @@ private void postProcessForConditionalTTL(List<Cell> result) {
}
}

private void determineEmptyCfCq(List<Cell> result) {
for (Cell cell : result) {
emptyCF = CellUtil.cloneFamily(cell);
if(ScanUtil.isEmptyColumn(cell, emptyCF, QueryConstants.EMPTY_COLUMN_BYTES)) {
emptyCQ = QueryConstants.EMPTY_COLUMN_BYTES;
emptyCFStore = true;
break;
} //Empty column is always encoded in FOUR_BYTE format, since it's a reserved qualifier. See EncodedColumnsUtil#isReservedColumnQualifier
else if(ScanUtil.isEmptyColumn(cell, emptyCF, QueryConstants.ENCODED_EMPTY_COLUMN_BYTES)) {
emptyCQ = QueryConstants.ENCODED_EMPTY_COLUMN_BYTES;
emptyCFStore = true;
break;
}
}
}

@Override
public boolean next(List<Cell> result) throws IOException {
boolean hasMore = storeScanner.next(result);
inputCellCount += result.size();
if (!result.isEmpty()) {
// This will happen only during flushes as then we don't pass PTable object
// to determine emptyCF and emptyCQ
if (emptyCQ == EMPTY_BYTE_ARRAY) {
determineEmptyCfCq(result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For not emptycfstore aren't we doing this check on every row

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, will fix that. Thanks

}
// This is for debugging
// printRow(result, "Input for " + tableName + " " + columnFamilyName, true, false);
phoenixLevelRowCompactor.compact(result, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.Region;
Expand Down Expand Up @@ -611,6 +612,28 @@ private boolean areMutationsInSameTable(Table targetHTable, Region region) {
region.getTableDescriptor().getTableName().getName()) == 0);
}

@Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
if (!isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using the same config to control both flushing and compaction. Will there be a scenario where we want to disable Phoenix compaction on flushing but still continue to use Phoenix compaction for major/minor compaction ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be a scenario where we want to disable Phoenix compaction on flushing but still continue to use Phoenix compaction for major/minor compaction ?

I think in general it will be good to have this flexibility. Shall I introduce a new config to enable disable preFlush hook separately?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think it will be better because Phoenix compaction has already been running in production and this is a new feature so having this flexibility will be helpful. We can enable it by default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flexibility will be helpful. I think we can go ahead with merging the changes after config is added and perf can be done later. The config will anyways be helpful to turn off the feature, WDYT @tkhurana?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are fine with waiting for a day or 2 more then I can post the perf results. For single column family its done. I am currently, doing perf analysis of multi-CF. Thanks

Copy link
Contributor Author

@sanjeet006py sanjeet006py May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@virajjasani perf analysis for multi-CF will take some time as I got to know that HBase flushTime metrics for multi-CF case sometimes could end up tracking combined time for flushing multiple CFs and other times only 1 CF. So, need to directly track time taken by StoreFlusher.
One idea is we wait for perf analysis before merging this PR but 5.3 release can go on or if we want this PR in 5.3 then for now we can disable preFlush hook via config and only enable it after perf analysis. I am bit inclined towards first approach. WDYT @virajjasani @tkhurana?

Copy link
Contributor

@tkhurana tkhurana May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sanjeet006py Instead of relying on metrics you could also use the log messages when flush happens. For example 2025-05-07 16:47:34,580 INFO [MemStoreFlusher.1] regionserver.HRegion - Finished flush of dataSize ~255.34 MB/267746710, heapSize ~256.01 MB/268445696, currentSize=10.04 MB/10529365 for 397f5412f43294d01081c54d7253d378 in 1986ms, sequenceid=207416387, compaction requested=false" Does that have the same problem too ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we don't need absolute numbers but just a comparison to make sure nothing is regressed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that have the same problem too ?

Yes as this log line only tells us how much time all the stores which were to be flushed took to flush data as total. I added a log line in StoreFlusher and will test with that now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the config to disable CompactionScanner for flushes.

return scanner;
} else {
return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
@Override public InternalScanner run() throws Exception {
String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable()
.getNameAsString();
Configuration conf = c.getEnvironment().getConfiguration();
long maxLookbackInMillis =
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf);
maxLookbackInMillis = CompactionScanner.getMaxLookbackInMillis(tableName,
store.getColumnFamilyName(), maxLookbackInMillis);
return new CompactionScanner(c.getEnvironment(), store, scanner,
maxLookbackInMillis, false, true, null);
}
});
}
}

@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,44 @@ public void testRetainingLastRowVersion() throws Exception {
}
}

@Test(timeout=60000)
public void testRetainingLastRowVersionForFlushes() throws Exception {
try(Connection conn = DriverManager.getConnection(getUrl())) {
String tableName = generateUniqueName();
createTable(tableName);
long timeIntervalBetweenTwoUpserts = (ttl / 2) + 1;
injectEdge.setValue(System.currentTimeMillis());
EnvironmentEdgeManager.injectEdge(injectEdge);
TableName dataTableName = TableName.valueOf(tableName);
injectEdge.incrementValue(1);
Statement stmt = conn.createStatement();
stmt.execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')");
conn.commit();
injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
stmt.execute("upsert into " + tableName + " values ('a', 'ab1')");
conn.commit();
injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
stmt.execute("upsert into " + tableName + " values ('a', 'ab2')");
conn.commit();
injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
stmt.execute("upsert into " + tableName + " values ('a', 'ab3')");
conn.commit();
injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
TestUtil.dumpTable(conn, dataTableName);
TestUtil.flush(utility, dataTableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use TestUtil.getRawCellCount and verify that extra row versions are removed.

injectEdge.incrementValue(1);
TestUtil.dumpTable(conn, dataTableName);
majorCompact(dataTableName);
injectEdge.incrementValue(1);
TestUtil.dumpTable(conn, dataTableName);
ResultSet rs = stmt.executeQuery("select * from " + dataTableName + " where id = 'a'");
while(rs.next()) {
assertNotNull(rs.getString(3));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's verify the column values are equal to the expected values here.

assertNotNull(rs.getString(4));
}
}
}

private void flush(TableName table) throws IOException {
Admin admin = getUtility().getAdmin();
admin.flush(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,12 @@ public void testMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled()
Thread.sleep(1);
}
flush(TableName.valueOf(tableName));
// Flushes dump and retain all the cells to HFile.
// Doing MAX_COLUMN_INDEX + 1 to account for empty cells
assertEquals(TestUtil.getRawCellCount(conn, TableName.valueOf(tableName), row),
rowUpdateCounter * (MAX_COLUMN_INDEX + 1));
// At every flush, extra cell versions should be removed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to rename this test method to testMinorCompactionAndFlushShouldNotRetainCellsWhenMaxLookbackIsDisabled

// MAX_COLUMN_INDEX table columns and one empty column will be retained for
// each row version.
int rawCellCount = TestUtil.getRawCellCount(
conn, TableName.valueOf(tableName), row);
assertEquals((i + 1) * (MAX_COLUMN_INDEX + 1) * versions, rawCellCount);
}
// Run one minor compaction (in case no minor compaction has happened yet)
TestUtil.minorCompact(utility, TableName.valueOf(tableName));
Expand Down