From 553aa4f820fb5d9873657f12b4ce9dc0540d52bb Mon Sep 17 00:00:00 2001 From: chhy2009 Date: Thu, 3 Jul 2025 14:58:26 +0800 Subject: [PATCH] Bugfix: fixed the following bugs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Resolved RPC call would hang in fiber UDP IO pool when exceeding max_conn_num threshold.​ - Adjust the heartbeat reporting logic to after port listening, to avoid service failures from premature traffic during RegisterService-to-port-binding latency. --- trpc/naming/domain/selector_domain_test.cc | 2 +- trpc/server/trpc_server.cc | 37 +++++++++++++------ trpc/server/trpc_server.h | 1 + .../conn_pool/fiber_udp_io_pool_connector.cc | 11 ++++++ .../conn_pool/fiber_udp_io_pool_connector.h | 2 + .../fiber_udp_io_pool_connector_group.cc | 7 ++-- 6 files changed, 43 insertions(+), 17 deletions(-) diff --git a/trpc/naming/domain/selector_domain_test.cc b/trpc/naming/domain/selector_domain_test.cc index 434e161a..00c9cae1 100644 --- a/trpc/naming/domain/selector_domain_test.cc +++ b/trpc/naming/domain/selector_domain_test.cc @@ -78,7 +78,7 @@ TEST(SelectorDomainTest, select_test) { result.context = MakeRefCounted(trpc_codec); result.context->SetCallerName("test_service"); - result.context->SetAddr("192.168.0.1", 1001); + result.context->SetAddr("127.0.0.1", 1001); int ret = ptr->ReportInvokeResult(&result); EXPECT_EQ(0, ret); diff --git a/trpc/server/trpc_server.cc b/trpc/server/trpc_server.cc index 43f57a39..c9199139 100644 --- a/trpc/server/trpc_server.cc +++ b/trpc/server/trpc_server.cc @@ -167,6 +167,8 @@ bool TrpcServer::Start() { if (!iter.second->Listen()) { return false; } + + RegisterServiceHeartBeatInfo(iter.first, iter.second); } else { TRPC_FMT_DEBUG("Service {} is not auto-started.", iter.first); } @@ -310,18 +312,6 @@ TrpcServer::RegisterRetCode TrpcServer::RegisterService(const std::string& servi } } - if (TrpcConfig::GetInstance()->GetGlobalConfig().heartbeat_config.enable_heartbeat) { - TRPC_FMT_DEBUG("service_name:{} report heartbeat info", service_name); - - ServiceHeartBeatInfo heartbeat_info; - auto const& option = service_adapter_it->second->GetServiceAdapterOption(); - heartbeat_info.service_name = service_name; - heartbeat_info.host = option.ip; - heartbeat_info.port = option.port; - heartbeat_info.group_name = option.threadmodel_instance_name; - HeartBeatReport::GetInstance()->RegisterServiceHeartBeatInfo(std::move(heartbeat_info)); - } - return RegisterRetCode::kOk; } @@ -351,6 +341,7 @@ TrpcServer::RegisterRetCode TrpcServer::RegisterService(const ServiceConfig& con } } + RegisterServiceHeartBeatInfo(config.service_name, service_adapter); return RegisterRetCode::kOk; } @@ -374,6 +365,7 @@ bool TrpcServer::StartService(const std::string& service_name) { return false; } + RegisterServiceHeartBeatInfo(service_name, service_adapter_it->second); return true; } @@ -472,4 +464,25 @@ std::shared_ptr GetTrpcServer() { return server; } +void TrpcServer::RegisterServiceHeartBeatInfo(const std::string& service_name, + const ServiceAdapterPtr& service_adapter) { + if (TrpcConfig::GetInstance()->GetGlobalConfig().heartbeat_config.enable_heartbeat) { + if (service_name == admin_service_name_) { + // Admin service does not need to report heartbeat + return; + } + + TRPC_FMT_DEBUG("service_name:{} report heartbeat info", service_name); + + // Register service heartbeat info to heartbeat thread + ServiceHeartBeatInfo heartbeat_info; + auto const& option = service_adapter->GetServiceAdapterOption(); + heartbeat_info.service_name = service_name; + heartbeat_info.host = option.ip; + heartbeat_info.port = option.port; + heartbeat_info.group_name = option.threadmodel_instance_name; + HeartBeatReport::GetInstance()->RegisterServiceHeartBeatInfo(std::move(heartbeat_info)); + } +} + } // namespace trpc diff --git a/trpc/server/trpc_server.h b/trpc/server/trpc_server.h index fc6e5e6d..e9e6c707 100644 --- a/trpc/server/trpc_server.h +++ b/trpc/server/trpc_server.h @@ -149,6 +149,7 @@ class TrpcServer { void BuildTrpcRegistryInfo(const ServiceAdapterOption& option, TrpcRegistryInfo& registry_info); bool CheckSharedTransportConfig(const ServiceConfig& first_service_conf, const ServiceConfig& service_conf); std::string GetSharedKey(const ServiceConfig& config); + void RegisterServiceHeartBeatInfo(const std::string& service_name, const ServiceAdapterPtr& service_adapter); private: // stop running diff --git a/trpc/transport/client/fiber/conn_pool/fiber_udp_io_pool_connector.cc b/trpc/transport/client/fiber/conn_pool/fiber_udp_io_pool_connector.cc index cc424995..24dbbc5e 100644 --- a/trpc/transport/client/fiber/conn_pool/fiber_udp_io_pool_connector.cc +++ b/trpc/transport/client/fiber/conn_pool/fiber_udp_io_pool_connector.cc @@ -61,6 +61,17 @@ void FiberUdpIoPoolConnector::Destroy() { } } +void FiberUdpIoPoolConnector::DoClose() { + // We need to start a fiber for reclaim here to avoid having both reclaim (which depends on read_events being 0 to + // prevent hanging) and message handling occur in the same fiber, which would cause a deadlock. + bool start_fiber = StartFiberDetached([this, ref = RefPtr(ref_ptr, this)]() mutable { + Stop(); + Destroy(); + }); + + TRPC_ASSERT(start_fiber && "StartFiberDetached failed when CloseConnection"); +} + bool FiberUdpIoPoolConnector::CreateFiberUdpTransceiver(uint64_t conn_id) { auto socket = Socket::CreateUdpSocket(options_.is_ipv6); if (!socket.IsValid()) { diff --git a/trpc/transport/client/fiber/conn_pool/fiber_udp_io_pool_connector.h b/trpc/transport/client/fiber/conn_pool/fiber_udp_io_pool_connector.h index 0981928a..d2bde896 100644 --- a/trpc/transport/client/fiber/conn_pool/fiber_udp_io_pool_connector.h +++ b/trpc/transport/client/fiber/conn_pool/fiber_udp_io_pool_connector.h @@ -52,6 +52,8 @@ class FiberUdpIoPoolConnector final : public RefCounted void Destroy(); + void DoClose(); + void SaveCallContext(CTransportReqMsg* req_msg, CTransportRspMsg* rsp_msg, OnCompletionFunction&& cb); void SendReqMsg(CTransportReqMsg* req_msg); diff --git a/trpc/transport/client/fiber/conn_pool/fiber_udp_io_pool_connector_group.cc b/trpc/transport/client/fiber/conn_pool/fiber_udp_io_pool_connector_group.cc index 9544122b..d4898d76 100644 --- a/trpc/transport/client/fiber/conn_pool/fiber_udp_io_pool_connector_group.cc +++ b/trpc/transport/client/fiber/conn_pool/fiber_udp_io_pool_connector_group.cc @@ -180,17 +180,16 @@ void FiberUdpIoPoolConnectorGroup::Reclaim(uint64_t id) { void FiberUdpIoPoolConnectorGroup::Reclaim(int ret, RefPtr& connector) { if (connector->GetConnId() == 0) { - connector->Stop(); - connector->Destroy(); + connector->DoClose(); return; } if (ret == 0) { Reclaim(connector->GetConnId()); } else { - connector->Stop(); - connector->Destroy(); + connector->DoClose(); udp_pool_[connector->GetConnId()] = nullptr; + Reclaim(connector->GetConnId()); } }