Skip to content
5 changes: 3 additions & 2 deletions include/pingcap/kv/Cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct Cluster
Cluster()
: pd_client(std::make_shared<pd::MockPDClient>())
, rpc_client(std::make_unique<RpcClient>())
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(1))
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(2))
, mpp_prober(std::make_unique<common::MPPProber>(this))
{
startBackgroundTasks();
Expand All @@ -48,7 +48,7 @@ struct Cluster
, oracle(std::make_unique<pd::Oracle>(pd_client, std::chrono::milliseconds(oracle_update_interval)))
, lock_resolver(std::make_unique<LockResolver>(this))
, api_version(config.api_version)
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(2))
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(3))
, mpp_prober(std::make_unique<common::MPPProber>(this))
{
startBackgroundTasks();
Expand All @@ -64,6 +64,7 @@ struct Cluster
// (e.g. background threads) that cluster object holds so as to exit elegantly.
~Cluster()
{
rpc_client->stop();
mpp_prober->stop();
if (region_cache)
region_cache->stop();
Expand Down
18 changes: 13 additions & 5 deletions include/pingcap/kv/RegionClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,15 @@ struct RegionClient
if (!status.ok())
{
auto extra_msg = "region_id: " + region_id.toString() + ", addr: " + ctx->addr;
rpc.dropConnIfNeeded(status);
if (status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED)
{
// The rpc is not implemented on this service.
throw Exception("rpc is not implemented: " + rpc.errMsg(status, extra_msg), GRPCNotImplemented);
}
std::string err_msg = rpc.errMsg(status, extra_msg);
log->warning(err_msg);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx, status);
continue;
}
if (resp->has_region_error())
Expand Down Expand Up @@ -125,11 +126,16 @@ struct RegionClient
{
if (no_resp)
return ::grpc::Status::OK;
return reader->Finish();
auto status = reader->Finish();
if (client != nullptr && shouldRemoveConnOnStatus(status))
client->removeConn(addr);
return status;
}

private:
friend struct RegionClient;
RpcClient * client = nullptr;
std::string addr;
::grpc::ClientContext context;
std::unique_ptr<::grpc::ClientReader<RESP>> reader;
bool no_resp = false;
Expand Down Expand Up @@ -165,6 +171,8 @@ struct RegionClient
}

auto stream_reader = std::make_unique<StreamReader<RESP>>();
stream_reader->client = cluster->rpc_client.get();
stream_reader->addr = ctx->addr;
RpcCall<T> rpc(cluster->rpc_client, ctx->addr);
rpc.setRequestCtx(req, ctx, cluster->api_version);
rpc.setClientContext(stream_reader->context, timeout, meta_data);
Expand Down Expand Up @@ -194,23 +202,23 @@ struct RegionClient
return stream_reader;
}
auto extra_msg = "region_id: " + region_id.toString() + ", addr: " + ctx->addr;
rpc.dropConnIfNeeded(status);
if (status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED)
{

// The rpc is not implemented on this service.
throw Exception("rpc is not implemented: " + rpc.errMsg(status, extra_msg), GRPCNotImplemented);
}
std::string err_msg = rpc.errMsg(status, extra_msg);
log->warning(err_msg);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx, status);
}
}

protected:
void onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const errorpb::Error & err) const;

// Normally, it happens when machine down or network partition between tidb and kv or process crash.
void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) const;
void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx, const ::grpc::Status & status) const;
};

} // namespace kv
Expand Down
48 changes: 48 additions & 0 deletions include/pingcap/kv/Rpc.h
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
#pragma once

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <grpcpp/create_channel.h>
#include <pingcap/Log.h>
#include <pingcap/kv/RegionCache.h>
#include <pingcap/kv/internal/type_traits.h>

#include <map>
#include <mutex>
#include <vector>

namespace pingcap
{
namespace kv
{
constexpr auto rpc_conn_check_interval = std::chrono::minutes(5);
constexpr size_t rpc_conn_check_timeout = 2;

struct ConnArray
{
std::mutex mutex;
Expand All @@ -27,6 +34,18 @@ struct ConnArray
std::shared_ptr<KvConnClient> get();
};

inline bool shouldRemoveConnOnStatus(const ::grpc::Status & status)
{
switch (status.error_code())
{
case ::grpc::StatusCode::UNAVAILABLE:
case ::grpc::StatusCode::CANCELLED:
return true;
default:
return false;
}
}

using ConnArrayPtr = std::shared_ptr<ConnArray>;
using GRPCMetaData = std::multimap<std::string, std::string>;

Expand All @@ -38,6 +57,13 @@ struct RpcClient

std::map<std::string, ConnArrayPtr> conns;

Logger * log = &Logger::get("pingcap.RpcClient");
std::chrono::minutes scan_interval = rpc_conn_check_interval;
size_t detect_rpc_timeout = rpc_conn_check_timeout;
std::atomic<bool> stopped = false;
std::mutex scan_mu;
std::condition_variable scan_cv;

RpcClient() = default;

explicit RpcClient(const ClusterConfig & config_)
Expand All @@ -51,13 +77,29 @@ struct RpcClient
conns.clear();
}

void run();

void stop();

void scanConns();

ConnArrayPtr getConnArray(const std::string & addr);

ConnArrayPtr createConnArray(const std::string & addr);

void removeConn(const std::string & addr);

void removeConn(const std::string & addr, const ConnArrayPtr & expected);
};

using RpcClientPtr = std::unique_ptr<RpcClient>;

inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status)
{
if (status.error_code() == grpc::StatusCode::UNAVAILABLE)
client->removeConn(addr);
}
Comment on lines +86 to +90
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Align dropConnIfNeeded() with the shared removal predicate.

shouldRemoveConnOnStatus() treats both UNAVAILABLE and CANCELLED as removable, but dropConnIfNeeded() only handles UNAVAILABLE. Since onSendFail() uses this helper, unary failures and stream setup failures with CANCELLED won’t invalidate the connection, while StreamReader::finish() will.

Proposed fix
 inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status)
 {
-    if (status.error_code() == grpc::StatusCode::UNAVAILABLE)
+    if (shouldRemoveConnOnStatus(status))
         client->markConnInvalid(addr);
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status)
{
if (status.error_code() == grpc::StatusCode::UNAVAILABLE)
client->markConnInvalid(addr);
}
inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status)
{
if (shouldRemoveConnOnStatus(status))
client->markConnInvalid(addr);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/pingcap/kv/Rpc.h` around lines 102 - 106, The helper dropConnIfNeeded
currently only treats grpc::StatusCode::UNAVAILABLE as removable while
shouldRemoveConnOnStatus also considers CANCELLED; update dropConnIfNeeded to
mark the connection invalid for both UNAVAILABLE and CANCELLED (i.e., check for
status.error_code() == UNAVAILABLE || status.error_code() == CANCELLED) so that
callers like onSendFail and stream setup failures behave consistently with
StreamReader::finish and shouldRemoveConnOnStatus, still calling
client->markConnInvalid(addr) when either code is seen.


// RpcCall holds the request and response, and delegates RPC calls.
template <typename T>
class RpcCall
Expand Down Expand Up @@ -107,6 +149,12 @@ class RpcCall
return msg;
}

void dropConnIfNeeded(const ::grpc::Status & status)
{
if (shouldRemoveConnOnStatus(status))
client->removeConn(addr);
}

private:
const RpcClientPtr & client;
const std::string & addr;
Expand Down
1 change: 1 addition & 0 deletions src/common/MPPProber.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ bool detectStore(kv::RpcClientPtr & rpc_client, const std::string & store_addr,
auto status = rpc.call(&context, req, &resp);
if (!status.ok())
{
rpc.dropConnIfNeeded(status);
log->warning("detect failed: " + store_addr + " error: " + rpc.errMsg(status, ""));
return false;
}
Expand Down
3 changes: 3 additions & 0 deletions src/kv/Cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ void Cluster::startBackgroundTasks()
{
thread_pool->start();

thread_pool->enqueue([this] {
rpc_client->run();
});
thread_pool->enqueue([this] {
mpp_prober->run();
});
Expand Down
3 changes: 2 additions & 1 deletion src/kv/RegionClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ void RegionClient::onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const er
cluster->region_cache->dropRegion(rpc_ctx->region);
}

void RegionClient::onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) const
void RegionClient::onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx, const ::grpc::Status & status) const
{
dropConnIfNeeded(cluster->rpc_client, rpc_ctx->addr, status);
cluster->region_cache->onSendReqFail(rpc_ctx, e);
// Retry on send request failure when it's not canceled.
// When a store is not available, the leader of related region should be elected quickly.
Expand Down
101 changes: 101 additions & 0 deletions src/kv/Rpc.cc
Original file line number Diff line number Diff line change
@@ -1,9 +1,42 @@
#include <pingcap/Exception.h>
#include <pingcap/kv/Rpc.h>

namespace pingcap
{
namespace kv
{
namespace
{
bool isConnValid(const std::shared_ptr<KvConnClient> & conn_client, size_t rpc_timeout)
{
auto state = conn_client->channel->GetState(false);
if (state == GRPC_CHANNEL_READY)
return true;

auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(rpc_timeout);
if (conn_client->channel->WaitForConnected(deadline))
return true;

return false;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}

bool isConnArrayValid(const ConnArrayPtr & conn_array, size_t rpc_timeout)
{
std::vector<std::shared_ptr<KvConnClient>> conn_snapshot;
{
std::lock_guard<std::mutex> lock(conn_array->mutex);
conn_snapshot = conn_array->vec;
}

for (const auto & conn_client : conn_snapshot)
{
if (!isConnValid(conn_client, rpc_timeout))
return false;
}
return true;
}
} // namespace

ConnArray::ConnArray(size_t max_size, const std::string & addr, const ClusterConfig & config_)
: address(addr)
, index(0)
Expand All @@ -22,6 +55,60 @@ std::shared_ptr<KvConnClient> ConnArray::get()
return vec[index];
}

void RpcClient::run()
{
while (!stopped.load())
{
{
std::unique_lock lock(scan_mu);
scan_cv.wait_for(lock, scan_interval, [this] {
return stopped.load();
});
}

if (stopped.load())
return;

try
{
scanConns();
}
catch (...)
{
log->warning(getCurrentExceptionMsg("RpcClient scan conns failed: "));
}
}
}

void RpcClient::stop()
{
stopped.store(true);
std::lock_guard lock(scan_mu);
scan_cv.notify_all();
}

void RpcClient::scanConns()
{
std::vector<std::pair<std::string, ConnArrayPtr>> conn_snapshot;
{
std::lock_guard<std::mutex> lock(mutex);
conn_snapshot.reserve(conns.size());
for (const auto & [addr, conn_array] : conns)
conn_snapshot.emplace_back(addr, conn_array);
}

for (const auto & [addr, conn_array] : conn_snapshot)
{
if (!isConnArrayValid(conn_array, detect_rpc_timeout))
{
log->information("delete unavailable addr: " + addr);
// RpcClient caches a connection pool per address, so drop the whole pool
// and let subsequent requests recreate fresh underlying channels lazily.
removeConn(addr, conn_array);
}
}
}

ConnArrayPtr RpcClient::getConnArray(const std::string & addr)
{
std::lock_guard<std::mutex> lock(mutex);
Expand All @@ -40,5 +127,19 @@ ConnArrayPtr RpcClient::createConnArray(const std::string & addr)
return conn_array;
}

void RpcClient::removeConn(const std::string & addr)
{
std::lock_guard<std::mutex> lock(mutex);
conns.erase(addr);
}

void RpcClient::removeConn(const std::string & addr, const ConnArrayPtr & expected)
{
std::lock_guard<std::mutex> lock(mutex);
auto it = conns.find(addr);
if (it != conns.end() && it->second == expected)
conns.erase(it);
}

} // namespace kv
} // namespace pingcap
Loading