Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,9 +577,16 @@ func (a *Client) remoteToProxy(connID int64, eConn *endpointConn) {
Data: buf[:n],
ConnectID: connID,
}}

klog.V(4).InfoS("sending data to kube-apiserver", "bytes", n, "connectionID", connID)
sendStart := time.Now()
if err := a.Send(resp); err != nil {
klog.ErrorS(err, "could not send DATA", "connectionID", connID)
}
sendLatency := time.Since(sendStart)
if sendLatency > 10*time.Millisecond {
klog.V(3).InfoS("slow send to kube-apiserver", "latency", sendLatency, "bytes", n, "connectionID", connID)
}
klog.V(4).InfoS("send data to server successfully", "bytes", n, "connectionID", connID)
}
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/server/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type ServerMetrics struct {
pendingDials *prometheus.GaugeVec
establishedConns *prometheus.GaugeVec
fullRecvChannels *prometheus.GaugeVec
fullRecvChannelWaits *prometheus.HistogramVec
dialFailures *prometheus.CounterVec
streamPackets *prometheus.CounterVec
streamErrors *prometheus.CounterVec
Expand Down Expand Up @@ -143,6 +144,18 @@ func newServerMetrics() *ServerMetrics {
"service_method",
},
)
fullRecvChannelWaits := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "full_receive_channel_wait_seconds",
Help: "Time spent blocked on a full receive channel before the send completed, partitioned by service method.",
Buckets: latencyBuckets,
},
[]string{
"service_method",
},
)
dialFailures := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Expand Down Expand Up @@ -206,6 +219,7 @@ func newServerMetrics() *ServerMetrics {
prometheus.MustRegister(pendingDials)
prometheus.MustRegister(establishedConns)
prometheus.MustRegister(fullRecvChannels)
prometheus.MustRegister(fullRecvChannelWaits)
prometheus.MustRegister(dialFailures)
prometheus.MustRegister(streamPackets)
prometheus.MustRegister(streamErrors)
Expand All @@ -223,6 +237,7 @@ func newServerMetrics() *ServerMetrics {
pendingDials: pendingDials,
establishedConns: establishedConns,
fullRecvChannels: fullRecvChannels,
fullRecvChannelWaits: fullRecvChannelWaits,
dialFailures: dialFailures,
streamPackets: streamPackets,
streamErrors: streamErrors,
Expand All @@ -243,6 +258,7 @@ func (s *ServerMetrics) Reset() {
s.pendingDials.Reset()
s.establishedConns.Reset()
s.fullRecvChannels.Reset()
s.fullRecvChannelWaits.Reset()
s.dialFailures.Reset()
s.streamPackets.Reset()
s.streamErrors.Reset()
Expand Down Expand Up @@ -299,6 +315,11 @@ func (s *ServerMetrics) FullRecvChannel(serviceMethod string) prometheus.Gauge {
return s.fullRecvChannels.With(prometheus.Labels{"service_method": serviceMethod})
}

// ObserveFullRecvChannelWait records how long a send blocked on a full receive channel.
func (s *ServerMetrics) ObserveFullRecvChannelWait(serviceMethod string, elapsed time.Duration) {
s.fullRecvChannelWaits.With(prometheus.Labels{"service_method": serviceMethod}).Observe(elapsed.Seconds())
}

type DialFailureReason string

const (
Expand Down
12 changes: 10 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,9 @@ func (s *ProxyServer) readFrontendToChannel(frontend *GrpcFrontend, userAgent []
klog.V(2).InfoS("Receive channel from frontend is full", "userAgent", userAgent)
fullRecvChannelMetric := metrics.Metrics.FullRecvChannel(metrics.Proxy)
fullRecvChannelMetric.Inc()
start := time.Now()
recvCh <- in
metrics.Metrics.ObserveFullRecvChannelWait(metrics.Proxy, time.Since(start))
fullRecvChannelMetric.Dec()
}
}
Expand Down Expand Up @@ -833,7 +835,11 @@ func (s *ProxyServer) readBackendToChannel(backend *Backend, recvCh chan *client
klog.V(2).InfoS("Receive channel from agent is full", "agentID", agentID)
fullRecvChannelMetric := metrics.Metrics.FullRecvChannel(metrics.Connect)
fullRecvChannelMetric.Inc()
start := time.Now()
recvCh <- in
latency := time.Since(start)
metrics.Metrics.ObserveFullRecvChannelWait(metrics.Connect, latency)
klog.V(2).InfoS("Latency: Receive channel from agent is full", "agentID", agentID, "latency", latency.Milliseconds())
fullRecvChannelMetric.Dec()
}
}
Expand Down Expand Up @@ -982,7 +988,8 @@ func (s *ProxyServer) serveRecvBackend(backend *Backend, agentID string, recvCh
break
}
if err := frontend.send(pkt); err != nil {
klog.ErrorS(err, "send to client stream failure", "agentID", agentID, "connectionID", resp.ConnectID)
// Likely cause: Kube-apiserver already terminated the connection with k-server.
klog.V(4).InfoS("send to client stream failure (receiving DATA)", "err", err.Error(), "agentID", agentID, "connectionID", resp.ConnectID)
} else {
klog.V(5).InfoS("DATA sent to frontend")
}
Expand All @@ -993,7 +1000,8 @@ func (s *ProxyServer) serveRecvBackend(backend *Backend, agentID string, recvCh
klog.V(4).InfoS("Received data ACK from agent", "agentID", agentID, "connectionID", resp.ConnectID)
frontend, err := s.getFrontend(agentID, resp.ConnectID)
if err != nil {
klog.ErrorS(err, "could not get frontent client")
// Likely cause: Kube-apiserver already terminated the connection with k-server.
klog.V(4).InfoS("could not get frontend client (receiving DATA_ACK)", "err", err.Error(), "agentID", agentID, "connectionID", resp.ConnectID)
break
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/server/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
if err != nil {
klog.ErrorS(err, "Received failure on connection")
// Likely cause: Kube-apiserver already terminated the connection with k-server.
klog.V(4).InfoS("Received failure on connection: frontent likely closed connection", "err", err.Error(), "host", r.Host, "agentID", agentID, "connectionID", connID)
break
}

Expand All @@ -179,13 +181,15 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !acquired {
start := time.Now()

klog.InfoS("Semaphore full, waiting for client receive window > 0", "start", start.String(), "host", r.Host, "agentID", agentID, "connectionID", connID)
klog.V(4).InfoS("Semaphore full, waiting for client receive window > 0", "start", start.String(), "host", r.Host, "agentID", agentID, "connectionID", connID)
// Blocking: if semaphore is full (waits till server.go serveRecvBackend() - which receives packets via the grpc stream from an agent - receives
// an ACK packet which releases 1 from the semaphore.
connection.flow.Acquire(context.Background(), 1)
latency := time.Now().Sub(start)

klog.V(3).InfoS("Latency when waiting for client receive window > 0", "latency", latency.Milliseconds(), "start", start.String(), "host", r.Host, "agentID", agentID, "connectionID", connID)
if latency > time.Millisecond {
klog.V(2).InfoS("Latency when waiting for client receive window > 0", "latency", latency.Milliseconds(), "start", start.String(), "host", r.Host, "agentID", agentID, "connectionID", connID)
}
}

} else {
Expand Down
Loading