Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -424,74 +424,76 @@ public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,

Connection connection = null;
RegionLocator regionLocator = null;
if (localityEnabled && useRegionLoc) {
Configuration newConf = new Configuration(conf);
newConf.setInt("hbase.hconnection.threads.max", 1);
try {
List<InputSplit> splits = new ArrayList<>();
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do try-with-resources here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on condition only resources are initializing.

if (localityEnabled && useRegionLoc) {
Configuration newConf = new Configuration(conf);
newConf.setInt("hbase.hconnection.threads.max", 1);

connection = ConnectionFactory.createConnection(newConf);
regionLocator = connection.getRegionLocator(htd.getTableName());

/* Get all locations for the table and cache it */
regionLocator.getAllRegionLocations();
} finally {
if (connection != null) {
connection.close();
}
}
}

List<InputSplit> splits = new ArrayList<>();
for (RegionInfo hri : regionManifests) {
// load region descriptor
List<String> hosts = null;
if (localityEnabled) {
if (regionLocator != null) {
/* Get Location from the local cache */
HRegionLocation location = regionLocator.getRegionLocation(hri.getStartKey(), false);

hosts = new ArrayList<>(1);
hosts.add(location.getHostname());
} else {
hosts = calculateLocationsForInputSplit(conf, htd, hri, tableDir);
for (RegionInfo hri : regionManifests) {
// load region descriptor
List<String> hosts = null;
if (localityEnabled) {
if (regionLocator != null) {
/* Get Location from the local cache */
HRegionLocation location = regionLocator.getRegionLocation(hri.getStartKey(), false);

hosts = new ArrayList<>(1);
hosts.add(location.getHostname());
} else {
hosts = calculateLocationsForInputSplit(conf, htd, hri, tableDir);
}
}
}

if (numSplits > 1) {
byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true);
for (int i = 0; i < sp.length - 1; i++) {
if (numSplits > 1) {
byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true);
for (int i = 0; i < sp.length - 1; i++) {
if (
PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i],
sp[i + 1])
) {

Scan boundedScan = new Scan(scan);
if (scan.getStartRow().length == 0) {
boundedScan.withStartRow(sp[i]);
} else {
boundedScan.withStartRow(
Bytes.compareTo(scan.getStartRow(), sp[i]) > 0 ? scan.getStartRow() : sp[i]);
}

if (scan.getStopRow().length == 0) {
boundedScan.withStopRow(sp[i + 1]);
} else {
boundedScan.withStopRow(Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0
? scan.getStopRow()
: sp[i + 1]);
}

splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));
}
}
} else {
if (
PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i], sp[i + 1])
PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
hri.getStartKey(), hri.getEndKey())
) {

Scan boundedScan = new Scan(scan);
if (scan.getStartRow().length == 0) {
boundedScan.withStartRow(sp[i]);
} else {
boundedScan.withStartRow(
Bytes.compareTo(scan.getStartRow(), sp[i]) > 0 ? scan.getStartRow() : sp[i]);
}

if (scan.getStopRow().length == 0) {
boundedScan.withStopRow(sp[i + 1]);
} else {
boundedScan.withStopRow(
Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]);
}

splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));
splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
}
}
} else {
if (
PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
hri.getEndKey())
) {

splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
}
}
} finally {
if (connection != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe all changes are just because we added an extra block {} and code got shifted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

connection.close();
}
}

return splits;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Scan.ReadType;
Expand All @@ -47,6 +48,7 @@
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
Expand Down Expand Up @@ -683,4 +685,49 @@ public void testReadFromRestoredSnapshotViaMR() throws Exception {
}
}
}

/**
* Tests that TableSnapshotInputFormatImpl correctly caches and uses region locations when
* locality is enabled
*/
@Test
public void testRegionLocatorUsesCache() throws Exception {
final TableName tableName = TableName.valueOf("testRegionLocatorCache");
Configuration conf = UTIL.getConfiguration();
try {
// Create table with multiple regions
createTableAndSnapshot(UTIL, tableName, "snapshot", getStartRow(), getEndRow(), 3);

conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, true);

Path tmpTableDir = UTIL.getDataTestDirOnTestFS("snapshot");

// Get snapshot manifest and region info
SnapshotManifest manifest = TableSnapshotInputFormatImpl.getSnapshotManifest(conf, "snapshot",
CommonFSUtils.getRootDir(conf), UTIL.getTestFileSystem());
List<RegionInfo> regionInfos =
TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest);

Scan scan = new Scan();

List<TableSnapshotInputFormatImpl.InputSplit> splits = TableSnapshotInputFormatImpl.getSplits(
scan, manifest, regionInfos, tmpTableDir, conf, new RegionSplitter.UniformSplit(), 1);

// Verify that splits contain proper locality information
Assert.assertNotNull(splits);
Assert.assertTrue(splits.size() > 0);

// Verify locations are populated from cache
for (TableSnapshotInputFormatImpl.InputSplit split : splits) {
String[] locations = split.getLocations();
// Locations should be populated from cache
Assert.assertNotNull(locations);
}

} finally {
UTIL.getAdmin().deleteSnapshot("snapshot");
UTIL.deleteTable(tableName);
conf.unset(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION);
}
}
}