From 0f46ff8afbcb47f64c54f627d151ed2f492bb82e Mon Sep 17 00:00:00 2001 From: Daniel Foehr <33809186+danielfoehrKn@users.noreply.github.com> Date: Fri, 22 May 2026 10:54:34 -0400 Subject: [PATCH] feat(metrics): add receive channel blocking latency and logs --- pkg/agent/client.go | 7 +++++++ pkg/server/metrics/metrics.go | 21 +++++++++++++++++++++ pkg/server/server.go | 12 ++++++++++-- pkg/server/tunnel.go | 8 ++++++-- 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/pkg/agent/client.go b/pkg/agent/client.go index 1382427f0..a395676b2 100644 --- a/pkg/agent/client.go +++ b/pkg/agent/client.go @@ -577,9 +577,16 @@ func (a *Client) remoteToProxy(connID int64, eConn *endpointConn) { Data: buf[:n], ConnectID: connID, }} + + klog.V(4).InfoS("sending data to kube-apiserver", "bytes", n, "connectionID", connID) + sendStart := time.Now() if err := a.Send(resp); err != nil { klog.ErrorS(err, "could not send DATA", "connectionID", connID) } + sendLatency := time.Since(sendStart) + if sendLatency > 10*time.Millisecond { + klog.V(3).InfoS("slow send to kube-apiserver", "latency", sendLatency, "bytes", n, "connectionID", connID) + } klog.V(4).InfoS("send data to server successfully", "bytes", n, "connectionID", connID) } } diff --git a/pkg/server/metrics/metrics.go b/pkg/server/metrics/metrics.go index 9a89d6ee6..48df63679 100644 --- a/pkg/server/metrics/metrics.go +++ b/pkg/server/metrics/metrics.go @@ -54,6 +54,7 @@ type ServerMetrics struct { pendingDials *prometheus.GaugeVec establishedConns *prometheus.GaugeVec fullRecvChannels *prometheus.GaugeVec + fullRecvChannelWaits *prometheus.HistogramVec dialFailures *prometheus.CounterVec streamPackets *prometheus.CounterVec streamErrors *prometheus.CounterVec @@ -143,6 +144,18 @@ func newServerMetrics() *ServerMetrics { "service_method", }, ) + fullRecvChannelWaits := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "full_receive_channel_wait_seconds", + Help: "Time spent blocked on a full receive channel before the send completed, partitioned by service method.", + Buckets: latencyBuckets, + }, + []string{ + "service_method", + }, + ) dialFailures := prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: Namespace, @@ -206,6 +219,7 @@ func newServerMetrics() *ServerMetrics { prometheus.MustRegister(pendingDials) prometheus.MustRegister(establishedConns) prometheus.MustRegister(fullRecvChannels) + prometheus.MustRegister(fullRecvChannelWaits) prometheus.MustRegister(dialFailures) prometheus.MustRegister(streamPackets) prometheus.MustRegister(streamErrors) @@ -223,6 +237,7 @@ func newServerMetrics() *ServerMetrics { pendingDials: pendingDials, establishedConns: establishedConns, fullRecvChannels: fullRecvChannels, + fullRecvChannelWaits: fullRecvChannelWaits, dialFailures: dialFailures, streamPackets: streamPackets, streamErrors: streamErrors, @@ -243,6 +258,7 @@ func (s *ServerMetrics) Reset() { s.pendingDials.Reset() s.establishedConns.Reset() s.fullRecvChannels.Reset() + s.fullRecvChannelWaits.Reset() s.dialFailures.Reset() s.streamPackets.Reset() s.streamErrors.Reset() @@ -299,6 +315,11 @@ func (s *ServerMetrics) FullRecvChannel(serviceMethod string) prometheus.Gauge { return s.fullRecvChannels.With(prometheus.Labels{"service_method": serviceMethod}) } +// ObserveFullRecvChannelWait records how long a send blocked on a full receive channel. +func (s *ServerMetrics) ObserveFullRecvChannelWait(serviceMethod string, elapsed time.Duration) { + s.fullRecvChannelWaits.With(prometheus.Labels{"service_method": serviceMethod}).Observe(elapsed.Seconds()) +} + type DialFailureReason string const ( diff --git a/pkg/server/server.go b/pkg/server/server.go index fa354f9b7..53ebd6945 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -497,7 +497,9 @@ func (s *ProxyServer) readFrontendToChannel(frontend *GrpcFrontend, userAgent [] klog.V(2).InfoS("Receive channel from frontend is full", "userAgent", userAgent) fullRecvChannelMetric := metrics.Metrics.FullRecvChannel(metrics.Proxy) fullRecvChannelMetric.Inc() + start := time.Now() recvCh <- in + metrics.Metrics.ObserveFullRecvChannelWait(metrics.Proxy, time.Since(start)) fullRecvChannelMetric.Dec() } } @@ -833,7 +835,11 @@ func (s *ProxyServer) readBackendToChannel(backend *Backend, recvCh chan *client klog.V(2).InfoS("Receive channel from agent is full", "agentID", agentID) fullRecvChannelMetric := metrics.Metrics.FullRecvChannel(metrics.Connect) fullRecvChannelMetric.Inc() + start := time.Now() recvCh <- in + latency := time.Since(start) + metrics.Metrics.ObserveFullRecvChannelWait(metrics.Connect, latency) + klog.V(2).InfoS("Latency: Receive channel from agent is full", "agentID", agentID, "latency", latency.Milliseconds()) fullRecvChannelMetric.Dec() } } @@ -982,7 +988,8 @@ func (s *ProxyServer) serveRecvBackend(backend *Backend, agentID string, recvCh break } if err := frontend.send(pkt); err != nil { - klog.ErrorS(err, "send to client stream failure", "agentID", agentID, "connectionID", resp.ConnectID) + // Likely cause: Kube-apiserver already terminated the connection with k-server. + klog.V(4).InfoS("send to client stream failure (receiving DATA)", "err", err.Error(), "agentID", agentID, "connectionID", resp.ConnectID) } else { klog.V(5).InfoS("DATA sent to frontend") } @@ -993,7 +1000,8 @@ func (s *ProxyServer) serveRecvBackend(backend *Backend, agentID string, recvCh klog.V(4).InfoS("Received data ACK from agent", "agentID", agentID, "connectionID", resp.ConnectID) frontend, err := s.getFrontend(agentID, resp.ConnectID) if err != nil { - klog.ErrorS(err, "could not get frontent client") + // Likely cause: Kube-apiserver already terminated the connection with k-server. + klog.V(4).InfoS("could not get frontend client (receiving DATA_ACK)", "err", err.Error(), "agentID", agentID, "connectionID", resp.ConnectID) break } diff --git a/pkg/server/tunnel.go b/pkg/server/tunnel.go index 74181298d..01a23a703 100644 --- a/pkg/server/tunnel.go +++ b/pkg/server/tunnel.go @@ -159,6 +159,8 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) { } if err != nil { klog.ErrorS(err, "Received failure on connection") + // Likely cause: Kube-apiserver already terminated the connection with k-server. + klog.V(4).InfoS("Received failure on connection: frontent likely closed connection", "err", err.Error(), "host", r.Host, "agentID", agentID, "connectionID", connID) break } @@ -179,13 +181,15 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !acquired { start := time.Now() - klog.InfoS("Semaphore full, waiting for client receive window > 0", "start", start.String(), "host", r.Host, "agentID", agentID, "connectionID", connID) + klog.V(4).InfoS("Semaphore full, waiting for client receive window > 0", "start", start.String(), "host", r.Host, "agentID", agentID, "connectionID", connID) // Blocking: if semaphore is full (waits till server.go serveRecvBackend() - which receives packets via the grpc stream from an agent - receives // an ACK packet which releases 1 from the semaphore. connection.flow.Acquire(context.Background(), 1) latency := time.Now().Sub(start) - klog.V(3).InfoS("Latency when waiting for client receive window > 0", "latency", latency.Milliseconds(), "start", start.String(), "host", r.Host, "agentID", agentID, "connectionID", connID) + if latency > time.Millisecond { + klog.V(2).InfoS("Latency when waiting for client receive window > 0", "latency", latency.Milliseconds(), "start", start.String(), "host", r.Host, "agentID", agentID, "connectionID", connID) + } } } else {