diff --git a/include/pingcap/kv/Cluster.h b/include/pingcap/kv/Cluster.h index ec2eac07..6561f79f 100644 --- a/include/pingcap/kv/Cluster.h +++ b/include/pingcap/kv/Cluster.h @@ -35,7 +35,7 @@ struct Cluster Cluster() : pd_client(std::make_shared()) , rpc_client(std::make_unique()) - , thread_pool(std::make_unique(1)) + , thread_pool(std::make_unique(2)) , mpp_prober(std::make_unique(this)) { startBackgroundTasks(); @@ -48,7 +48,7 @@ struct Cluster , oracle(std::make_unique(pd_client, std::chrono::milliseconds(oracle_update_interval))) , lock_resolver(std::make_unique(this)) , api_version(config.api_version) - , thread_pool(std::make_unique(2)) + , thread_pool(std::make_unique(3)) , mpp_prober(std::make_unique(this)) { startBackgroundTasks(); @@ -64,6 +64,7 @@ struct Cluster // (e.g. background threads) that cluster object holds so as to exit elegantly. ~Cluster() { + rpc_client->stop(); mpp_prober->stop(); if (region_cache) region_cache->stop(); diff --git a/include/pingcap/kv/RegionClient.h b/include/pingcap/kv/RegionClient.h index 4da18096..cdc573d0 100644 --- a/include/pingcap/kv/RegionClient.h +++ b/include/pingcap/kv/RegionClient.h @@ -81,7 +81,7 @@ struct RegionClient } std::string err_msg = rpc.errMsg(status, extra_msg); log->warning(err_msg); - onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx); + onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx, status); continue; } if (resp->has_region_error()) @@ -196,13 +196,12 @@ struct RegionClient auto extra_msg = "region_id: " + region_id.toString() + ", addr: " + ctx->addr; if (status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) { - // The rpc is not implemented on this service. throw Exception("rpc is not implemented: " + rpc.errMsg(status, extra_msg), GRPCNotImplemented); } std::string err_msg = rpc.errMsg(status, extra_msg); log->warning(err_msg); - onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx); + onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx, status); } } @@ -210,7 +209,7 @@ struct RegionClient void onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const errorpb::Error & err) const; // Normally, it happens when machine down or network partition between tidb and kv or process crash. - void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) const; + void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx, const ::grpc::Status & status) const; }; } // namespace kv diff --git a/include/pingcap/kv/Rpc.h b/include/pingcap/kv/Rpc.h index 7439b2fd..b0f98078 100644 --- a/include/pingcap/kv/Rpc.h +++ b/include/pingcap/kv/Rpc.h @@ -1,10 +1,14 @@ #pragma once +#include +#include +#include #include #include #include #include +#include #include #include @@ -12,6 +16,9 @@ namespace pingcap { namespace kv { +constexpr auto rpc_conn_check_interval = std::chrono::minutes(5); +constexpr size_t rpc_conn_check_timeout = 2; + struct ConnArray { std::mutex mutex; @@ -38,6 +45,13 @@ struct RpcClient std::map conns; + Logger * log = &Logger::get("pingcap.RpcClient"); + std::chrono::minutes scan_interval = rpc_conn_check_interval; + size_t detect_rpc_timeout = rpc_conn_check_timeout; + std::atomic stopped = false; + std::condition_variable scan_cv; + std::vector invalid_conns; + RpcClient() = default; explicit RpcClient(const ClusterConfig & config_) @@ -49,8 +63,19 @@ struct RpcClient std::unique_lock lk(mutex); config = config_; conns.clear(); + invalid_conns.clear(); } + void run(); + + void stop(); + + void scanConns(); + + void markConnInvalid(const std::string & addr); + + void removeInvalidConns(); + ConnArrayPtr getConnArray(const std::string & addr); ConnArrayPtr createConnArray(const std::string & addr); @@ -58,6 +83,12 @@ struct RpcClient using RpcClientPtr = std::unique_ptr; +inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status) +{ + if (status.error_code() == grpc::StatusCode::UNAVAILABLE) + client->markConnInvalid(addr); +} + // RpcCall holds the request and response, and delegates RPC calls. template class RpcCall diff --git a/src/kv/Cluster.cc b/src/kv/Cluster.cc index 70b6c903..a5a10eb0 100644 --- a/src/kv/Cluster.cc +++ b/src/kv/Cluster.cc @@ -31,6 +31,9 @@ void Cluster::startBackgroundTasks() { thread_pool->start(); + thread_pool->enqueue([this] { + rpc_client->run(); + }); thread_pool->enqueue([this] { mpp_prober->run(); }); diff --git a/src/kv/RegionClient.cc b/src/kv/RegionClient.cc index 6c369f92..faf4599c 100644 --- a/src/kv/RegionClient.cc +++ b/src/kv/RegionClient.cc @@ -106,8 +106,9 @@ void RegionClient::onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const er cluster->region_cache->dropRegion(rpc_ctx->region); } -void RegionClient::onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) const +void RegionClient::onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx, const ::grpc::Status & status) const { + dropConnIfNeeded(cluster->rpc_client, rpc_ctx->addr, status); cluster->region_cache->onSendReqFail(rpc_ctx, e); // Retry on send request failure when it's not canceled. // When a store is not available, the leader of related region should be elected quickly. diff --git a/src/kv/Rpc.cc b/src/kv/Rpc.cc index dddb4bc9..f419995c 100644 --- a/src/kv/Rpc.cc +++ b/src/kv/Rpc.cc @@ -1,9 +1,39 @@ +#include #include namespace pingcap { namespace kv { +namespace +{ +bool isConnValid(const std::shared_ptr & conn_client, size_t rpc_timeout) +{ + auto state = conn_client->channel->GetState(false); + if (state == GRPC_CHANNEL_READY) + return true; + + auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(rpc_timeout); + return conn_client->channel->WaitForConnected(deadline); +} + +bool isConnArrayValid(const ConnArrayPtr & conn_array, size_t rpc_timeout) +{ + std::vector> conn_snapshot; + { + std::lock_guard lock(conn_array->mutex); + conn_snapshot = conn_array->vec; + } + + for (const auto & conn_client : conn_snapshot) + { + if (!isConnValid(conn_client, rpc_timeout)) + return false; + } + return true; +} +} // namespace + ConnArray::ConnArray(size_t max_size, const std::string & addr, const ClusterConfig & config_) : address(addr) , index(0) @@ -22,6 +52,88 @@ std::shared_ptr ConnArray::get() return vec[index]; } +void RpcClient::run() +{ + while (!stopped.load()) + { + bool has_invalid_conns = false; + { + std::unique_lock lock(mutex); + scan_cv.wait_for(lock, scan_interval, [this] { + return stopped.load() || !invalid_conns.empty(); + }); + has_invalid_conns = !invalid_conns.empty(); + } + + if (stopped.load()) + return; + + if (has_invalid_conns) + { + removeInvalidConns(); + continue; + } + + try + { + scanConns(); + removeInvalidConns(); + } + catch (...) + { + log->warning(getCurrentExceptionMsg("RpcClient scan conns failed: ")); + } + } +} + +void RpcClient::stop() +{ + stopped.store(true); + scan_cv.notify_all(); +} + +void RpcClient::scanConns() +{ + std::vector> conn_snapshot; + { + std::lock_guard lock(mutex); + conn_snapshot.reserve(conns.size()); + for (const auto & [addr, conn_array] : conns) + conn_snapshot.emplace_back(addr, conn_array); + } + + for (const auto & [addr, conn_array] : conn_snapshot) + { + if (!isConnArrayValid(conn_array, detect_rpc_timeout)) + { + std::lock_guard lock(mutex); + invalid_conns.push_back(addr); + } + } +} + +void RpcClient::markConnInvalid(const std::string & addr) +{ + std::lock_guard lock(mutex); + invalid_conns.push_back(addr); + scan_cv.notify_all(); +} + +void RpcClient::removeInvalidConns() +{ + std::lock_guard lock(mutex); + if (invalid_conns.empty()) + return; + + for (const auto & addr : invalid_conns) + { + log->information("delete unavailable addr: " + addr); + conns.erase(addr); + } + + invalid_conns.clear(); +} + ConnArrayPtr RpcClient::getConnArray(const std::string & addr) { std::lock_guard lock(mutex); @@ -39,6 +151,5 @@ ConnArrayPtr RpcClient::createConnArray(const std::string & addr) conns[addr] = conn_array; return conn_array; } - } // namespace kv } // namespace pingcap