Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion trpc/naming/domain/selector_domain_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ TEST(SelectorDomainTest, select_test) {
result.context = MakeRefCounted<ClientContext>(trpc_codec);
result.context->SetCallerName("test_service");

result.context->SetAddr("192.168.0.1", 1001);
result.context->SetAddr("127.0.0.1", 1001);
int ret = ptr->ReportInvokeResult(&result);
EXPECT_EQ(0, ret);

Expand Down
37 changes: 25 additions & 12 deletions trpc/server/trpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ bool TrpcServer::Start() {
if (!iter.second->Listen()) {
return false;
}

RegisterServiceHeartBeatInfo(iter.first, iter.second);
} else {
TRPC_FMT_DEBUG("Service {} is not auto-started.", iter.first);
}
Expand Down Expand Up @@ -310,18 +312,6 @@ TrpcServer::RegisterRetCode TrpcServer::RegisterService(const std::string& servi
}
}

if (TrpcConfig::GetInstance()->GetGlobalConfig().heartbeat_config.enable_heartbeat) {
TRPC_FMT_DEBUG("service_name:{} report heartbeat info", service_name);

ServiceHeartBeatInfo heartbeat_info;
auto const& option = service_adapter_it->second->GetServiceAdapterOption();
heartbeat_info.service_name = service_name;
heartbeat_info.host = option.ip;
heartbeat_info.port = option.port;
heartbeat_info.group_name = option.threadmodel_instance_name;
HeartBeatReport::GetInstance()->RegisterServiceHeartBeatInfo(std::move(heartbeat_info));
}

return RegisterRetCode::kOk;
}

Expand Down Expand Up @@ -351,6 +341,7 @@ TrpcServer::RegisterRetCode TrpcServer::RegisterService(const ServiceConfig& con
}
}

RegisterServiceHeartBeatInfo(config.service_name, service_adapter);
return RegisterRetCode::kOk;
}

Expand All @@ -374,6 +365,7 @@ bool TrpcServer::StartService(const std::string& service_name) {
return false;
}

RegisterServiceHeartBeatInfo(service_name, service_adapter_it->second);
return true;
}

Expand Down Expand Up @@ -472,4 +464,25 @@ std::shared_ptr<TrpcServer> GetTrpcServer() {
return server;
}

void TrpcServer::RegisterServiceHeartBeatInfo(const std::string& service_name,
const ServiceAdapterPtr& service_adapter) {
if (TrpcConfig::GetInstance()->GetGlobalConfig().heartbeat_config.enable_heartbeat) {
if (service_name == admin_service_name_) {
// Admin service does not need to report heartbeat
return;
}

TRPC_FMT_DEBUG("service_name:{} report heartbeat info", service_name);

// Register service heartbeat info to heartbeat thread
ServiceHeartBeatInfo heartbeat_info;
auto const& option = service_adapter->GetServiceAdapterOption();
heartbeat_info.service_name = service_name;
heartbeat_info.host = option.ip;
heartbeat_info.port = option.port;
heartbeat_info.group_name = option.threadmodel_instance_name;
HeartBeatReport::GetInstance()->RegisterServiceHeartBeatInfo(std::move(heartbeat_info));
}
}

} // namespace trpc
1 change: 1 addition & 0 deletions trpc/server/trpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class TrpcServer {
void BuildTrpcRegistryInfo(const ServiceAdapterOption& option, TrpcRegistryInfo& registry_info);
bool CheckSharedTransportConfig(const ServiceConfig& first_service_conf, const ServiceConfig& service_conf);
std::string GetSharedKey(const ServiceConfig& config);
void RegisterServiceHeartBeatInfo(const std::string& service_name, const ServiceAdapterPtr& service_adapter);

private:
// stop running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ void FiberUdpIoPoolConnector::Destroy() {
}
}

void FiberUdpIoPoolConnector::DoClose() {
// We need to start a fiber for reclaim here to avoid having both reclaim (which depends on read_events being 0 to
// prevent hanging) and message handling occur in the same fiber, which would cause a deadlock.
bool start_fiber = StartFiberDetached([this, ref = RefPtr(ref_ptr, this)]() mutable {
Stop();
Destroy();
});

TRPC_ASSERT(start_fiber && "StartFiberDetached failed when CloseConnection");
}

bool FiberUdpIoPoolConnector::CreateFiberUdpTransceiver(uint64_t conn_id) {
auto socket = Socket::CreateUdpSocket(options_.is_ipv6);
if (!socket.IsValid()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class FiberUdpIoPoolConnector final : public RefCounted<FiberUdpIoPoolConnector>

void Destroy();

void DoClose();

void SaveCallContext(CTransportReqMsg* req_msg, CTransportRspMsg* rsp_msg, OnCompletionFunction&& cb);

void SendReqMsg(CTransportReqMsg* req_msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,16 @@ void FiberUdpIoPoolConnectorGroup::Reclaim(uint64_t id) {

void FiberUdpIoPoolConnectorGroup::Reclaim(int ret, RefPtr<FiberUdpIoPoolConnector>& connector) {
if (connector->GetConnId() == 0) {
connector->Stop();
connector->Destroy();
connector->DoClose();
return;
}

if (ret == 0) {
Reclaim(connector->GetConnId());
} else {
connector->Stop();
connector->Destroy();
connector->DoClose();
udp_pool_[connector->GetConnId()] = nullptr;
Reclaim(connector->GetConnId());
}
}

Expand Down