Skip to content
5 changes: 3 additions & 2 deletions include/pingcap/kv/Cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct Cluster
Cluster()
: pd_client(std::make_shared<pd::MockPDClient>())
, rpc_client(std::make_unique<RpcClient>())
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(1))
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(2))
, mpp_prober(std::make_unique<common::MPPProber>(this))
{
startBackgroundTasks();
Expand All @@ -48,7 +48,7 @@ struct Cluster
, oracle(std::make_unique<pd::Oracle>(pd_client, std::chrono::milliseconds(oracle_update_interval)))
, lock_resolver(std::make_unique<LockResolver>(this))
, api_version(config.api_version)
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(2))
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(3))
, mpp_prober(std::make_unique<common::MPPProber>(this))
{
startBackgroundTasks();
Expand All @@ -64,6 +64,7 @@ struct Cluster
// (e.g. background threads) that cluster object holds so as to exit elegantly.
~Cluster()
{
rpc_client->stop();
mpp_prober->stop();
if (region_cache)
region_cache->stop();
Expand Down
7 changes: 3 additions & 4 deletions include/pingcap/kv/RegionClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ struct RegionClient
}
std::string err_msg = rpc.errMsg(status, extra_msg);
log->warning(err_msg);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx, status);
continue;
}
if (resp->has_region_error())
Expand Down Expand Up @@ -196,21 +196,20 @@ struct RegionClient
auto extra_msg = "region_id: " + region_id.toString() + ", addr: " + ctx->addr;
if (status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED)
{

// The rpc is not implemented on this service.
throw Exception("rpc is not implemented: " + rpc.errMsg(status, extra_msg), GRPCNotImplemented);
}
std::string err_msg = rpc.errMsg(status, extra_msg);
log->warning(err_msg);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx, status);
}
}

protected:
void onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const errorpb::Error & err) const;

// Normally, it happens when machine down or network partition between tidb and kv or process crash.
void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) const;
void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx, const ::grpc::Status & status) const;
};

} // namespace kv
Expand Down
37 changes: 36 additions & 1 deletion include/pingcap/kv/Rpc.h
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
#pragma once

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <grpcpp/create_channel.h>
#include <pingcap/Log.h>
#include <pingcap/kv/RegionCache.h>
#include <pingcap/kv/internal/type_traits.h>

#include <map>
#include <mutex>
#include <vector>

namespace pingcap
{
namespace kv
{
constexpr auto rpc_conn_check_interval = std::chrono::minutes(5);
constexpr size_t rpc_conn_check_timeout = 2;

struct ConnArray
{
std::mutex mutex;
std::string address;

size_t index = 0;
std::vector<std::shared_ptr<KvConnClient>> vec;
std::vector<std::shared_ptr<KvConnClient>> vec; // TODO(xzx) why this is a vector?

ConnArray() = default;

Expand All @@ -38,6 +45,13 @@ struct RpcClient

std::map<std::string, ConnArrayPtr> conns;

Logger * log = &Logger::get("pingcap.RpcClient");
std::chrono::minutes scan_interval = rpc_conn_check_interval;
size_t detect_rpc_timeout = rpc_conn_check_timeout;
std::atomic<bool> stopped = false;
std::condition_variable scan_cv;
std::vector<std::string> invalid_conns;

RpcClient() = default;

explicit RpcClient(const ClusterConfig & config_)
Expand All @@ -49,15 +63,36 @@ struct RpcClient
std::unique_lock lk(mutex);
config = config_;
conns.clear();
invalid_conns.clear();
}

void run();

void stop();

void scanConns();

void markConnInvalid(const std::string & addr);

void removeInvalidConns();

ConnArrayPtr getConnArray(const std::string & addr);

ConnArrayPtr createConnArray(const std::string & addr);

void removeConn(const std::string & addr);

void removeConn(const std::string & addr, const ConnArrayPtr & expected);
};

using RpcClientPtr = std::unique_ptr<RpcClient>;

inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status)
{
if (status.error_code() == grpc::StatusCode::UNAVAILABLE)
client->markConnInvalid(addr);
}
Comment on lines +86 to +90
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Align dropConnIfNeeded() with the shared removal predicate.

shouldRemoveConnOnStatus() treats both UNAVAILABLE and CANCELLED as removable, but dropConnIfNeeded() only handles UNAVAILABLE. Since onSendFail() uses this helper, unary failures and stream setup failures with CANCELLED won’t invalidate the connection, while StreamReader::finish() will.

Proposed fix
 inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status)
 {
-    if (status.error_code() == grpc::StatusCode::UNAVAILABLE)
+    if (shouldRemoveConnOnStatus(status))
         client->markConnInvalid(addr);
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status)
{
if (status.error_code() == grpc::StatusCode::UNAVAILABLE)
client->markConnInvalid(addr);
}
inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status)
{
if (shouldRemoveConnOnStatus(status))
client->markConnInvalid(addr);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/pingcap/kv/Rpc.h` around lines 102 - 106, The helper dropConnIfNeeded
currently only treats grpc::StatusCode::UNAVAILABLE as removable while
shouldRemoveConnOnStatus also considers CANCELLED; update dropConnIfNeeded to
mark the connection invalid for both UNAVAILABLE and CANCELLED (i.e., check for
status.error_code() == UNAVAILABLE || status.error_code() == CANCELLED) so that
callers like onSendFail and stream setup failures behave consistently with
StreamReader::finish and shouldRemoveConnOnStatus, still calling
client->markConnInvalid(addr) when either code is seen.


// RpcCall holds the request and response, and delegates RPC calls.
template <typename T>
class RpcCall
Expand Down
3 changes: 3 additions & 0 deletions src/kv/Cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ void Cluster::startBackgroundTasks()
{
thread_pool->start();

thread_pool->enqueue([this] {
rpc_client->run();
});
thread_pool->enqueue([this] {
mpp_prober->run();
});
Expand Down
3 changes: 2 additions & 1 deletion src/kv/RegionClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ void RegionClient::onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const er
cluster->region_cache->dropRegion(rpc_ctx->region);
}

void RegionClient::onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) const
void RegionClient::onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx, const ::grpc::Status & status) const
{
dropConnIfNeeded(cluster->rpc_client, rpc_ctx->addr, status);
cluster->region_cache->onSendReqFail(rpc_ctx, e);
// Retry on send request failure when it's not canceled.
// When a store is not available, the leader of related region should be elected quickly.
Expand Down
126 changes: 126 additions & 0 deletions src/kv/Rpc.cc
Original file line number Diff line number Diff line change
@@ -1,9 +1,39 @@
#include <pingcap/Exception.h>
#include <pingcap/kv/Rpc.h>

namespace pingcap
{
namespace kv
{
namespace
{
bool isConnValid(const std::shared_ptr<KvConnClient> & conn_client, size_t rpc_timeout)
{
auto state = conn_client->channel->GetState(false);
if (state == GRPC_CHANNEL_READY)
return true;

auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(rpc_timeout);
return conn_client->channel->WaitForConnected(deadline);
}

bool isConnArrayValid(const ConnArrayPtr & conn_array, size_t rpc_timeout)
{
std::vector<std::shared_ptr<KvConnClient>> conn_snapshot;
{
std::lock_guard<std::mutex> lock(conn_array->mutex);
conn_snapshot = conn_array->vec;
}

for (const auto & conn_client : conn_snapshot)
{
if (!isConnValid(conn_client, rpc_timeout))
return false;
}
return true;
}
} // namespace

ConnArray::ConnArray(size_t max_size, const std::string & addr, const ClusterConfig & config_)
: address(addr)
, index(0)
Expand All @@ -22,6 +52,88 @@ std::shared_ptr<KvConnClient> ConnArray::get()
return vec[index];
}

void RpcClient::run()
{
while (!stopped.load())
{
bool has_invalid_conns = false;
{
std::unique_lock lock(mutex);
scan_cv.wait_for(lock, scan_interval, [this] {
return stopped.load() || !invalid_conns.empty();
});
has_invalid_conns = !invalid_conns.empty();
}

if (stopped.load())
return;

if (has_invalid_conns)
{
removeInvalidConns();
continue;
}

try
{
scanConns();
removeInvalidConns();
}
catch (...)
{
log->warning(getCurrentExceptionMsg("RpcClient scan conns failed: "));
}
}
}

void RpcClient::stop()
{
stopped.store(true);
scan_cv.notify_all();
}

void RpcClient::scanConns()
{
std::vector<std::pair<std::string, ConnArrayPtr>> conn_snapshot;
{
std::lock_guard<std::mutex> lock(mutex);
conn_snapshot.reserve(conns.size());
for (const auto & [addr, conn_array] : conns)
conn_snapshot.emplace_back(addr, conn_array);
}

for (const auto & [addr, conn_array] : conn_snapshot)
{
if (!isConnArrayValid(conn_array, detect_rpc_timeout))
{
std::lock_guard<std::mutex> lock(mutex);
invalid_conns.push_back(addr);
}
}
}

void RpcClient::markConnInvalid(const std::string & addr)
{
std::lock_guard<std::mutex> lock(mutex);
invalid_conns.push_back(addr);
scan_cv.notify_all();
}

void RpcClient::removeInvalidConns()
{
std::lock_guard<std::mutex> lock(mutex);
if (invalid_conns.empty())
return;

for (const auto & addr : invalid_conns)
{
log->information("delete unavailable addr: " + addr);
conns.erase(addr);
}

invalid_conns.clear();
Comment on lines +95 to +134
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don’t erase newly recreated connection pools from stale scan results.

scanConns() validates a snapshot, then queues only addr. If another thread recreates conns[addr] before removeInvalidConns(), the later address-only erase can delete the fresh healthy pool. Use the captured ConnArrayPtr as the expected value when removing scan-detected failures.

One possible fix using the existing conditional remover
 void RpcClient::scanConns()
 {
     std::vector<std::pair<std::string, ConnArrayPtr>> conn_snapshot;
@@
     for (const auto & [addr, conn_array] : conn_snapshot)
     {
         if (!isConnArrayValid(conn_array, detect_rpc_timeout))
         {
-            std::lock_guard<std::mutex> lock(mutex);
-            invalid_conns.push_back(addr);
+            removeConn(addr, conn_array);
         }
     }
 }

If the scan path still needs batched logging/removal, store (addr, expected_conn_array) in invalid_conns and compare the pointer before erasing.

🧰 Tools
🪛 Clang (14.0.6)

[error] 95-95: method 'scanConns' can be made static

(readability-convert-member-functions-to-static,-warnings-as-errors)


[error] 122-122: method 'removeInvalidConns' can be made static

(readability-convert-member-functions-to-static,-warnings-as-errors)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/kv/Rpc.cc` around lines 95 - 134, scanConns currently snapshots (addr,
ConnArrayPtr) but only pushes addr into invalid_conns, which can cause
removeInvalidConns to erase a newly recreated pool; change the invalid_conns
container to hold pairs of (std::string addr, ConnArrayPtr expected) and in
scanConns push the captured ConnArrayPtr, update markConnInvalid to push a pair
with the current conns[addr] (or nullptr if called externally), and modify
removeInvalidConns to check that conns[addr] exists and equals the expected
ConnArrayPtr before erasing (skip if different), leaving semantics of logging
and clearing the invalid_conns intact.

}

ConnArrayPtr RpcClient::getConnArray(const std::string & addr)
{
std::lock_guard<std::mutex> lock(mutex);
Expand All @@ -40,5 +152,19 @@ ConnArrayPtr RpcClient::createConnArray(const std::string & addr)
return conn_array;
}

void RpcClient::removeConn(const std::string & addr)
{
std::lock_guard<std::mutex> lock(mutex);
conns.erase(addr);
}

void RpcClient::removeConn(const std::string & addr, const ConnArrayPtr & expected)
{
std::lock_guard<std::mutex> lock(mutex);
auto it = conns.find(addr);
if (it != conns.end() && it->second == expected)
conns.erase(it);
}

} // namespace kv
} // namespace pingcap
Loading