From 29737d1ea742af1a8d35e69669e9d67f3ede44ef Mon Sep 17 00:00:00 2001 From: Alex Smith Date: Mon, 6 Apr 2026 14:59:41 -0400 Subject: [PATCH 1/6] Dynatrace to RHOBS implementation --- CLAUDE.md | 2 +- .../etcddatabasequotalowspace.go | 63 ++++++------------- .../etcddatabasequotalowspace_test.go | 20 +++--- .../investigation/investigation.go | 50 +++++++-------- pkg/ocm/mock/ocmmock.go | 15 +++++ pkg/ocm/ocm.go | 25 ++++++++ 6 files changed, 96 insertions(+), 79 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 7f0e2ff7..0fce6a13 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -72,7 +72,7 @@ Pre-initialized clients available in investigation resources: - **K8s** (`pkg/k8s`) - Kubernetes API client - **osd-network-verifier** (`pkg/networkverifier`) - Network verification -For HCP clusters, when using `WithManagementRestConfig()`, `WithManagementK8sClient()`, or `WithManagementOCClient()`, the Dynatrace management cluster URL is automatically fetched and available in `r.DynatraceManagementClusterURL`. +For HCP clusters, when using `WithManagementRestConfig()`, `WithManagementK8sClient()`, or `WithManagementOCClient()`, the RHOBS cell endpoint is automatically fetched from the management cluster's external configuration labels and available in `r.RHOBSCell`. ### Workflow diff --git a/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace.go b/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace.go index 056a0433..bdc2a585 100644 --- a/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace.go +++ b/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace.go @@ -3,7 +3,6 @@ package etcddatabasequotalowspace import ( "context" - "encoding/json" "fmt" "net/url" "strings" @@ -317,14 +316,14 @@ func (i *Investigation) runHCPEtcdAnalysis(ctx context.Context, rb investigation err = waitForJobCompletion(ctx, r.ManagementK8sClient, etcdAnalysisJob.Name, r.HCPNamespace, analysisJobTimeout) - // Add Dynatrace logs query URL to notes if available - if r.DynatraceManagementClusterURL != "" && r.ManagementClusterName != "" { - dynatraceLogsURL := buildDynatraceLogsURL( - r.DynatraceManagementClusterURL, + // Add RHOBS logs query URL to notes if available + if r.RHOBSCell != "" && r.ManagementClusterName != "" { + rhobsLogsURL := buildRHOBSLogsURL( + r.RHOBSCell, r.HCPNamespace, etcdAnalysisJob.Name, ) - r.Notes.AppendSuccess("Note: Click 'Show full note' to access the full URL. Logs may take up to 5 minutes to appear in Dynatrace.\n\nDynatrace Logs: %s", dynatraceLogsURL) + r.Notes.AppendSuccess("Note: Click 'Show full note' to access the full URL. Logs may take up to 5 minutes to appear in RHOBS.\n\nRHOBS Logs: %s", rhobsLogsURL) } if err != nil { @@ -576,41 +575,19 @@ func getEtcdctlContainerImage(pod *corev1.Pod) (string, error) { return "", fmt.Errorf("etcdctl container image not found in pod: %s", pod.Name) } -// buildDynatraceLogsURL constructs a Dynatrace UI URL with a DQL query for the analysis job logs -func buildDynatraceLogsURL(baseURL, namespace, jobId string) string { - query := fmt.Sprintf( - `fetch logs, from:now()-1h | filter matchesValue(event.type, "LOG") and (matchesValue(k8s.namespace.name, "%s")) and (matchesValue(k8s.pod.name, "%s*")) | sort timestamp desc | limit 1000`, - namespace, - jobId, - ) - - // Build the state object for Dynatrace logs UI - // The order of fields matters for some Dynatrace UI versions - state := map[string]interface{}{ - "version": 2, - "dt.timeframe": map[string]string{ - "from": "now()-30m", - "to": "now()", - }, - "tableConfig": map[string]interface{}{ - "columns": []string{"timestamp", "status", "Log message"}, - }, - "showDqlEditor": true, - "filterFieldQuery": query, - "dt.query": query, - "facetsCollapse": true, - } - - jsonBytes, err := json.Marshal(state) - if err != nil { - jsonBytes = []byte("{}") - logging.Warnf("failed to marshal Dynatrace state to JSON: %v", err) - } - - // URL encode the JSON state - // Note: QueryEscape uses + for spaces, but hash fragments need %20 - encodedState := url.QueryEscape(string(jsonBytes)) - encodedState = strings.ReplaceAll(encodedState, "+", "%20") - - return fmt.Sprintf("%sui/apps/dynatrace.logs/#%s", baseURL, encodedState) +// buildRHOBSLogsURL constructs a Grafana/RHOBS explore URL with a LogQL query for the analysis job logs +func buildRHOBSLogsURL(rhobsCell, namespace, jobId string) string { + // Build LogQL query to filter logs for the specific namespace and job + // LogQL syntax: {namespace="...", pod=~"..."} + logQLQuery := fmt.Sprintf(`{kubernetes_namespace_name="%s", kubernetes_pod_name=~"%s.*"}`, namespace, jobId) + + // Build Grafana explore URL with LogQL query + // Using relative time range of 30 minutes + // left parameter contains the datasource and query details + leftParam := url.QueryEscape(fmt.Sprintf( + `{"datasource":"Loki","queries":[{"refId":"A","expr":"%s","queryType":"range"}],"range":{"from":"now-30m","to":"now"}}`, + logQLQuery, + )) + + return fmt.Sprintf("https://%s/explore?left=%s", rhobsCell, leftParam) } diff --git a/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace_test.go b/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace_test.go index dee5d668..e5903227 100644 --- a/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace_test.go +++ b/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace_test.go @@ -277,12 +277,12 @@ func TestRunHCPEtcdAnalysis_Success(t *testing.T) { rb := &investigation.ResourceBuilderMock{ Resources: &investigation.Resources{ - Cluster: cluster, - ManagementK8sClient: fakeK8s, - HCPNamespace: "ocm-test-namespace", - ManagementClusterName: "test-management-cluster", - DynatraceManagementClusterURL: "https://hrm15629.apps.dynatrace.com/", - Notes: notewriter.New("etcddatabasequotalowspace_test", logging.RawLogger), + Cluster: cluster, + ManagementK8sClient: fakeK8s, + HCPNamespace: "ocm-test-namespace", + ManagementClusterName: "test-management-cluster", + RHOBSCell: "grafana.rhobs.example.com", + Notes: notewriter.New("etcddatabasequotalowspace_test", logging.RawLogger), }, } @@ -295,11 +295,11 @@ func TestRunHCPEtcdAnalysis_Success(t *testing.T) { assert.Contains(t, result.EtcdDatabaseAnalysis.Labels, "completed") assert.Len(t, result.Actions, 3) // NoteAndReportFrom (2 actions) + Escalate (1 action) - // Verify Dynatrace URL appears in notes + // Verify RHOBS URL appears in notes notesContent := rb.Resources.Notes.String() - assert.Contains(t, notesContent, "Dynatrace Logs:") - assert.Contains(t, notesContent, "https://hrm15629.apps.dynatrace.com/") - assert.Contains(t, notesContent, "ui/apps/dynatrace.logs/#") + assert.Contains(t, notesContent, "RHOBS Logs:") + assert.Contains(t, notesContent, "https://grafana.rhobs.example.com/") + assert.Contains(t, notesContent, "explore?left=") assert.Contains(t, notesContent, "ocm-test-namespace") assert.Contains(t, notesContent, "etcd-analysis-") // Job name prefix } diff --git a/pkg/investigations/investigation/investigation.go b/pkg/investigations/investigation/investigation.go index 51484d4b..e2d89d40 100644 --- a/pkg/investigations/investigation/investigation.go +++ b/pkg/investigations/investigation/investigation.go @@ -75,26 +75,26 @@ type Investigation interface { // Resources holds all resources/tools required for alert investigations type Resources struct { - Name string - Cluster *cmv1.Cluster - ClusterDeployment *hivev1.ClusterDeployment - AwsClient aws.Client - BpClient backplane.Client - RestConfig *backplane.RestConfig - K8sClient k8sclient.Client - OcmClient ocm.Client - PdClient pagerduty.Client - Notes *notewriter.NoteWriter - OCClient oc.Client - ManagementRestConfig *backplane.RestConfig - ManagementK8sClient k8sclient.Client - ManagementOCClient oc.Client - HCPNamespace string - HCNamespace string - IsHCP bool - IsInfrastructureCluster bool - ManagementClusterName string - DynatraceManagementClusterURL string + Name string + Cluster *cmv1.Cluster + ClusterDeployment *hivev1.ClusterDeployment + AwsClient aws.Client + BpClient backplane.Client + RestConfig *backplane.RestConfig + K8sClient k8sclient.Client + OcmClient ocm.Client + PdClient pagerduty.Client + Notes *notewriter.NoteWriter + OCClient oc.Client + ManagementRestConfig *backplane.RestConfig + ManagementK8sClient k8sclient.Client + ManagementOCClient oc.Client + HCPNamespace string + HCNamespace string + IsHCP bool + IsInfrastructureCluster bool + ManagementClusterName string + RHOBSCell string } type ResourceBuilder interface { @@ -367,7 +367,7 @@ func (r *ResourceBuilderT) buildManagementClusterResources() error { managementClusterName := hypershiftConfig.ManagementCluster() if managementClusterName == "" { - logging.Warnf("Management cluster name is empty, cannot fetch Dynatrace URL") + logging.Warnf("Management cluster name is empty, cannot fetch RHOBS cell") return nil } @@ -375,17 +375,17 @@ func (r *ResourceBuilderT) buildManagementClusterResources() error { managementCluster, err := r.ocmClient.GetClusterInfo(managementClusterName) if err != nil { - logging.Warnf("Failed to get management cluster info for Dynatrace URL: %v", err) + logging.Warnf("Failed to get management cluster info for RHOBS cell: %v", err) return nil } - dynatraceURL, err := r.ocmClient.GetDynatraceURL(managementCluster) + rhobsCell, err := r.ocmClient.GetRHOBSCell(managementCluster.ID()) if err != nil { - logging.Warnf("Failed to get Dynatrace URL: %v", err) + logging.Warnf("Failed to get RHOBS cell: %v", err) return nil } - r.builtResources.DynatraceManagementClusterURL = dynatraceURL + r.builtResources.RHOBSCell = rhobsCell return nil } diff --git a/pkg/ocm/mock/ocmmock.go b/pkg/ocm/mock/ocmmock.go index bc577b86..d8e33581 100644 --- a/pkg/ocm/mock/ocmmock.go +++ b/pkg/ocm/mock/ocmmock.go @@ -147,6 +147,21 @@ func (mr *MockClientMockRecorder) GetOrganizationID(clusterID any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrganizationID", reflect.TypeOf((*MockClient)(nil).GetOrganizationID), clusterID) } +// GetRHOBSCell mocks base method. +func (m *MockClient) GetRHOBSCell(clusterID string) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetRHOBSCell", clusterID) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetRHOBSCell indicates an expected call of GetRHOBSCell. +func (mr *MockClientMockRecorder) GetRHOBSCell(clusterID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRHOBSCell", reflect.TypeOf((*MockClient)(nil).GetRHOBSCell), clusterID) +} + // GetServiceLog mocks base method. func (m *MockClient) GetServiceLog(cluster *v1.Cluster, filter string) (*v10.ClusterLogsUUIDListResponse, error) { m.ctrl.T.Helper() diff --git a/pkg/ocm/ocm.go b/pkg/ocm/ocm.go index 5179109b..3e5a5ba5 100644 --- a/pkg/ocm/ocm.go +++ b/pkg/ocm/ocm.go @@ -50,6 +50,7 @@ type Client interface { GetClusterInfo(identifier string) (*cmv1.Cluster, error) IsManagingCluster(clusterID string) (bool, error) GetDynatraceURL(cluster *cmv1.Cluster) (string, error) + GetRHOBSCell(clusterID string) (string, error) } // SdkClient is the ocm client with which we can run the commands @@ -440,3 +441,27 @@ func (c *SdkClient) GetDynatraceURL(cluster *cmv1.Cluster) (string, error) { return "", errors.New("dynatrace tenant label not found in subscription") } + +// GetRHOBSCell retrieves the RHOBS cell endpoint from the management cluster's external configuration labels +func (c *SdkClient) GetRHOBSCell(clusterID string) (string, error) { + const rhobsCellLabel = "ext-hypershift.openshift.io/rhobs-cell" + + resp, err := c.conn.ClustersMgmt().V1().Clusters().Cluster(clusterID).ExternalConfiguration().Labels().List().Send() + if err != nil { + return "", fmt.Errorf("failed to fetch external configuration labels for cluster %s: %w", clusterID, err) + } + + for _, label := range resp.Items().Slice() { + key := label.Key() + value := label.Value() + + if key == rhobsCellLabel { + if value == "" { + return "", errors.New("rhobs-cell label is empty") + } + return value, nil + } + } + + return "", errors.New("rhobs-cell label not found in external configuration") +} From e9bb3c46d77a4f9785956c6e2fa1b6ef02528b95 Mon Sep 17 00:00:00 2001 From: Alex Smith Date: Mon, 6 Apr 2026 16:10:58 -0400 Subject: [PATCH 2/6] Query logs from rhobs directly --- CLAUDE.md | 4 +- pkg/controller/controller.go | 10 +- .../etcddatabasequotalowspace.go | 92 ++++- .../etcddatabasequotalowspace_test.go | 151 ++++++- .../investigation/investigation.go | 9 +- pkg/rhobs/mock/rhobsmock.go | 58 +++ pkg/rhobs/rhobs.go | 170 ++++++++ pkg/rhobs/rhobs_test.go | 376 ++++++++++++++++++ pkg/rhobs/types.go | 51 +++ test/set_stage_env.sh | 1 + 10 files changed, 900 insertions(+), 22 deletions(-) create mode 100644 pkg/rhobs/mock/rhobsmock.go create mode 100644 pkg/rhobs/rhobs.go create mode 100644 pkg/rhobs/rhobs_test.go create mode 100644 pkg/rhobs/types.go diff --git a/CLAUDE.md b/CLAUDE.md index 0fce6a13..200722bb 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -71,8 +71,9 @@ Pre-initialized clients available in investigation resources: - **PagerDuty** (`pkg/pagerduty`) - Alert info, incident management, notes - **K8s** (`pkg/k8s`) - Kubernetes API client - **osd-network-verifier** (`pkg/networkverifier`) - Network verification +- **RHOBS** (`pkg/rhobs`) - RHOBS Grafana Loki API for HCP log fetching -For HCP clusters, when using `WithManagementRestConfig()`, `WithManagementK8sClient()`, or `WithManagementOCClient()`, the RHOBS cell endpoint is automatically fetched from the management cluster's external configuration labels and available in `r.RHOBSCell`. +For HCP clusters, when using `WithManagementRestConfig()`, `WithManagementK8sClient()`, or `WithManagementOCClient()`, the RHOBS cell endpoint is automatically fetched from the management cluster's external configuration labels and available in `r.RHOBSCell`. A RHOBS client can be created using the `RHOBSCell` endpoint and `CAD_GRAFANA_TOKEN` to fetch logs from Loki. ### Workflow @@ -99,6 +100,7 @@ For local development (available via `source test/set_stage_env.sh`): - `PD_SIGNATURE` - PagerDuty webhook signature validation - `BACKPLANE_URL`, `BACKPLANE_INITIAL_ARN` - Backplane access - `CAD_PROMETHEUS_PUSHGATEWAY` - Metrics endpoint +- `CAD_GRAFANA_TOKEN` - Service account token for RHOBS Grafana/Loki API access (HCP log fetching) Optional: - `BACKPLANE_PROXY` - Required for local development diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index be66505c..973b7cac 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -79,6 +79,7 @@ type Dependencies struct { BackplaneURL string BackplaneProxy string AWSProxy string + GrafanaToken string ExperimentalEnabled bool } @@ -137,6 +138,12 @@ func initializeDependencies() (*Dependencies, error) { experimentalEnabledVar := os.Getenv("CAD_EXPERIMENTAL_ENABLED") experimentalEnabled, _ := strconv.ParseBool(experimentalEnabledVar) + // Load Grafana/RHOBS token for HCP log fetching + grafanaToken := os.Getenv("CAD_GRAFANA_TOKEN") + if grafanaToken == "" { + return nil, fmt.Errorf("missing required environment variable CAD_GRAFANA_TOKEN") + } + // Create OCM client ocmClient, err := ocm.New(ocmClientID, ocmClientSecret, ocmURL) if err != nil { @@ -160,6 +167,7 @@ func initializeDependencies() (*Dependencies, error) { BackplaneURL: backplaneURL, BackplaneProxy: backplaneProxy, AWSProxy: awsProxy, + GrafanaToken: grafanaToken, ExperimentalEnabled: experimentalEnabled, }, nil } @@ -250,7 +258,7 @@ func NewController(opts ControllerOptions, deps *Dependencies) (Controller, erro func (c *investigationRunner) runInvestigation(ctx context.Context, clusterId string, inv investigation.Investigation, pdClient *pagerduty.SdkClient) error { metrics.Inc(metrics.Alerts, inv.Name()) - builder, err := investigation.NewResourceBuilder(c.ocmClient, c.bpClient, clusterId, inv.Name(), c.dependencies.BackplaneURL) + builder, err := investigation.NewResourceBuilder(c.ocmClient, c.bpClient, clusterId, inv.Name(), c.dependencies.BackplaneURL, c.dependencies.GrafanaToken) if pdClient != nil { builder.WithPdClient(pdClient) } diff --git a/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace.go b/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace.go index bdc2a585..982ba2e5 100644 --- a/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace.go +++ b/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace.go @@ -18,6 +18,7 @@ import ( "github.com/openshift/configuration-anomaly-detection/pkg/logging" "github.com/openshift/configuration-anomaly-detection/pkg/metrics" "github.com/openshift/configuration-anomaly-detection/pkg/notewriter" + "github.com/openshift/configuration-anomaly-detection/pkg/rhobs" "github.com/openshift/configuration-anomaly-detection/pkg/types" ) @@ -26,6 +27,15 @@ const ( etcdctlInitContainer = "reset-member" ) +// rhobsClientFactory is a factory function for creating RHOBS clients +// Can be overridden in tests for dependency injection +var rhobsClientFactory = func(baseURL, token string) (rhobs.Client, error) { + return rhobs.NewClient(rhobs.Config{ + BaseURL: baseURL, + Token: token, + }) +} + type Investigation struct{} // SnapshotResult contains information about the etcd snapshot that was taken @@ -315,17 +325,6 @@ func (i *Investigation) runHCPEtcdAnalysis(ctx context.Context, rb investigation r.Notes.AppendAutomation("Created HCP analysis job: %s in namespace %s", etcdAnalysisJob.Name, r.HCPNamespace) err = waitForJobCompletion(ctx, r.ManagementK8sClient, etcdAnalysisJob.Name, r.HCPNamespace, analysisJobTimeout) - - // Add RHOBS logs query URL to notes if available - if r.RHOBSCell != "" && r.ManagementClusterName != "" { - rhobsLogsURL := buildRHOBSLogsURL( - r.RHOBSCell, - r.HCPNamespace, - etcdAnalysisJob.Name, - ) - r.Notes.AppendSuccess("Note: Click 'Show full note' to access the full URL. Logs may take up to 5 minutes to appear in RHOBS.\n\nRHOBS Logs: %s", rhobsLogsURL) - } - if err != nil { if investigation.IsInfrastructureError(err) { return result, err @@ -343,6 +342,26 @@ func (i *Investigation) runHCPEtcdAnalysis(ctx context.Context, rb investigation return result, nil } + r.Notes.AppendSuccess("Analysis job completed successfully, fetching logs from RHOBS") + + // Fetch logs from RHOBS + logs, err := fetchRHOBSLogs(ctx, r, r.HCPNamespace, etcdAnalysisJob.Name) + if err != nil { + r.Notes.AppendWarning("Failed to fetch RHOBS logs: %v", err) + logging.Errorf("failed to fetch RHOBS logs: %v", err) + result.EtcdDatabaseAnalysis = investigation.InvestigationStep{ + Performed: true, + Labels: []string{"failure", "rhobs_logs_failed"}, + } + result.Actions = append( + executor.NoteAndReportFrom(r.Notes, r.Cluster.ID(), i.Name()), + executor.Escalate("Failed to fetch RHOBS logs - manual investigation required"), + ) + return result, nil + } + + r.Notes.AppendSuccess("Successfully fetched logs from RHOBS\n\n%s", logs) + result.EtcdDatabaseAnalysis = investigation.InvestigationStep{ Performed: true, Labels: []string{"success", "completed"}, @@ -350,7 +369,7 @@ func (i *Investigation) runHCPEtcdAnalysis(ctx context.Context, rb investigation result.Actions = append( executor.NoteAndReportFrom(r.Notes, r.Cluster.ID(), i.Name()), - executor.Escalate("HCP etcd analysis complete - see dynatrace logs for details"), + executor.Escalate("HCP etcd analysis complete - see logs above for details"), ) return result, nil } @@ -591,3 +610,52 @@ func buildRHOBSLogsURL(rhobsCell, namespace, jobId string) string { return fmt.Sprintf("https://%s/explore?left=%s", rhobsCell, leftParam) } + +// fetchRHOBSLogs fetches logs from RHOBS Grafana Loki for the analysis job +func fetchRHOBSLogs(ctx context.Context, r *investigation.Resources, namespace, jobName string) (string, error) { + // Validate prerequisites + if r.RHOBSCell == "" { + return "", fmt.Errorf("RHOBS cell endpoint not available") + } + if r.GrafanaToken == "" { + return "", fmt.Errorf("grafana token not available") + } + + logging.Infof("Fetching logs from RHOBS for job %s in namespace %s", jobName, namespace) + + // Create RHOBS client using factory (allows dependency injection in tests) + rhobsClient, err := rhobsClientFactory(fmt.Sprintf("https://%s", r.RHOBSCell), r.GrafanaToken) + if err != nil { + return "", fmt.Errorf("failed to create RHOBS client: %w", err) + } + + // Build LogQL query to filter logs for the specific namespace and job + logQLQuery := fmt.Sprintf(`{kubernetes_namespace_name="%s", kubernetes_pod_name=~"%s.*"}`, namespace, jobName) + + // Query logs from the last 30 minutes + now := time.Now() + start := now.Add(-30 * time.Minute) + + logging.Debugf("Querying RHOBS with LogQL: %s (time range: %s to %s)", logQLQuery, start.Format(time.RFC3339), now.Format(time.RFC3339)) + + result, err := rhobsClient.QueryLogs(ctx, logQLQuery, start, now, 1000) + if err != nil { + return "", fmt.Errorf("failed to query RHOBS logs: %w", err) + } + + if result.TotalLines == 0 { + logging.Warnf("No logs found in RHOBS for job %s", jobName) + return "No logs found in RHOBS for this job. Logs may take up to 5 minutes to appear.", nil + } + + logging.Infof("Successfully fetched %d log lines from RHOBS", result.TotalLines) + + // Format logs for display (limit to 100 lines for readability) + formattedLogs := rhobs.FormatLogsForDisplay(result, 100) + + // Also include the Grafana explore URL for reference + exploreURL := buildRHOBSLogsURL(r.RHOBSCell, namespace, jobName) + formattedLogs += fmt.Sprintf("\n\nView full logs in Grafana: %s", exploreURL) + + return formattedLogs, nil +} diff --git a/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace_test.go b/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace_test.go index e5903227..3375bb35 100644 --- a/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace_test.go +++ b/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -16,6 +17,8 @@ import ( "github.com/openshift/configuration-anomaly-detection/pkg/investigations/investigation" "github.com/openshift/configuration-anomaly-detection/pkg/logging" "github.com/openshift/configuration-anomaly-detection/pkg/notewriter" + "github.com/openshift/configuration-anomaly-detection/pkg/rhobs" + rhobsmock "github.com/openshift/configuration-anomaly-detection/pkg/rhobs/mock" ) func TestIsHCPCluster(t *testing.T) { @@ -214,6 +217,9 @@ func TestGetEtcdctlContainerImage(t *testing.T) { } func TestRunHCPEtcdAnalysis_Success(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + cluster, _ := cmv1.NewCluster(). ID("test-cluster-id"). ExternalID("external-cluster-id"). @@ -275,6 +281,33 @@ func TestRunHCPEtcdAnalysis_Success(t *testing.T) { } }() + // Create mock RHOBS client + mockRHOBSClient := rhobsmock.NewMockClient(ctrl) + + // Mock log entries to return + mockTimestamp := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + mockLogResult := &rhobs.LogQueryResult{ + Entries: []rhobs.LogEntry{ + {Timestamp: mockTimestamp, Line: "Starting etcd analysis"}, + {Timestamp: mockTimestamp.Add(time.Second), Line: "Analysis complete"}, + }, + TotalLines: 2, + StreamCount: 1, + } + + // Set up expectation for QueryLogs call + mockRHOBSClient.EXPECT(). + QueryLogs(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), 1000). + Return(mockLogResult, nil). + Times(1) + + // Override the factory to return our mock + originalFactory := rhobsClientFactory + rhobsClientFactory = func(baseURL, token string) (rhobs.Client, error) { + return mockRHOBSClient, nil + } + defer func() { rhobsClientFactory = originalFactory }() + rb := &investigation.ResourceBuilderMock{ Resources: &investigation.Resources{ Cluster: cluster, @@ -282,6 +315,7 @@ func TestRunHCPEtcdAnalysis_Success(t *testing.T) { HCPNamespace: "ocm-test-namespace", ManagementClusterName: "test-management-cluster", RHOBSCell: "grafana.rhobs.example.com", + GrafanaToken: "test-token", Notes: notewriter.New("etcddatabasequotalowspace_test", logging.RawLogger), }, } @@ -295,13 +329,118 @@ func TestRunHCPEtcdAnalysis_Success(t *testing.T) { assert.Contains(t, result.EtcdDatabaseAnalysis.Labels, "completed") assert.Len(t, result.Actions, 3) // NoteAndReportFrom (2 actions) + Escalate (1 action) - // Verify RHOBS URL appears in notes + // Verify logs were fetched and included in notes + notesContent := rb.Resources.Notes.String() + assert.Contains(t, notesContent, "Successfully fetched logs from RHOBS") + assert.Contains(t, notesContent, "Starting etcd analysis") + assert.Contains(t, notesContent, "Analysis complete") + assert.Contains(t, notesContent, "View full logs in Grafana") + assert.Contains(t, notesContent, "https://grafana.rhobs.example.com/explore") +} + +func TestRunHCPEtcdAnalysis_RHOBSFetchFailure(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cluster, _ := cmv1.NewCluster(). + ID("test-cluster-id"). + ExternalID("external-cluster-id"). + Hypershift(cmv1.NewHypershift().Enabled(true)). + Build() + + etcdPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "etcd-0", + Namespace: "ocm-test-namespace", + Labels: map[string]string{ + "k8s-app": "etcd", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + InitContainers: []corev1.Container{ + { + Name: "reset-member", + Image: "quay.io/openshift/etcd:v4.15", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + + fakeK8s := fake.NewClientBuilder(). + WithObjects(etcdPod). + WithStatusSubresource(&batchv1.Job{}). + Build() + + ctx := t.Context() + + go func() { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + jobList := &batchv1.JobList{} + if err := fakeK8s.List(ctx, jobList, client.InNamespace("ocm-test-namespace")); err != nil { + continue + } + + for i := range jobList.Items { + job := &jobList.Items[i] + if job.Status.Succeeded == 0 { + job.Status.Succeeded = 1 + _ = fakeK8s.Status().Update(ctx, job) + } + } + } + } + }() + + // Create mock RHOBS client that returns an error + mockRHOBSClient := rhobsmock.NewMockClient(ctrl) + mockRHOBSClient.EXPECT(). + QueryLogs(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), 1000). + Return(nil, assert.AnError). + Times(1) + + // Override the factory to return our mock + originalFactory := rhobsClientFactory + rhobsClientFactory = func(baseURL, token string) (rhobs.Client, error) { + return mockRHOBSClient, nil + } + defer func() { rhobsClientFactory = originalFactory }() + + rb := &investigation.ResourceBuilderMock{ + Resources: &investigation.Resources{ + Cluster: cluster, + ManagementK8sClient: fakeK8s, + HCPNamespace: "ocm-test-namespace", + ManagementClusterName: "test-management-cluster", + RHOBSCell: "grafana.rhobs.example.com", + GrafanaToken: "test-token", + Notes: notewriter.New("etcddatabasequotalowspace_test", logging.RawLogger), + }, + } + + inv := &Investigation{} + result, err := inv.runHCPEtcdAnalysis(ctx, rb) + + // Investigation should handle RHOBS failure gracefully but mark as failed + assert.NoError(t, err) + assert.True(t, result.EtcdDatabaseAnalysis.Performed) + assert.Contains(t, result.EtcdDatabaseAnalysis.Labels, "failure") + assert.Contains(t, result.EtcdDatabaseAnalysis.Labels, "rhobs_logs_failed") + assert.Len(t, result.Actions, 3) + + // Verify error message appears in notes notesContent := rb.Resources.Notes.String() - assert.Contains(t, notesContent, "RHOBS Logs:") - assert.Contains(t, notesContent, "https://grafana.rhobs.example.com/") - assert.Contains(t, notesContent, "explore?left=") - assert.Contains(t, notesContent, "ocm-test-namespace") - assert.Contains(t, notesContent, "etcd-analysis-") // Job name prefix + assert.Contains(t, notesContent, "Failed to fetch RHOBS logs") } func TestRunHCPEtcdAnalysis_NoEtcdPod(t *testing.T) { diff --git a/pkg/investigations/investigation/investigation.go b/pkg/investigations/investigation/investigation.go index e2d89d40..38057d15 100644 --- a/pkg/investigations/investigation/investigation.go +++ b/pkg/investigations/investigation/investigation.go @@ -48,15 +48,18 @@ func NewResourceBuilder( clusterId string, name string, backplaneUrl string, + grafanaToken string, ) (ResourceBuilder, error) { rb := &ResourceBuilderT{ clusterId: clusterId, name: name, ocmClient: ocmClient, backplaneUrl: backplaneUrl, + grafanaToken: grafanaToken, builtResources: &Resources{ - BpClient: bpClient, - OcmClient: ocmClient, + BpClient: bpClient, + OcmClient: ocmClient, + GrafanaToken: grafanaToken, }, } @@ -95,6 +98,7 @@ type Resources struct { IsInfrastructureCluster bool ManagementClusterName string RHOBSCell string + GrafanaToken string } type ResourceBuilder interface { @@ -129,6 +133,7 @@ type ResourceBuilderT struct { logLevel string pipelineName string backplaneUrl string + grafanaToken string ocmClient *ocm.SdkClient diff --git a/pkg/rhobs/mock/rhobsmock.go b/pkg/rhobs/mock/rhobsmock.go new file mode 100644 index 00000000..9c592caa --- /dev/null +++ b/pkg/rhobs/mock/rhobsmock.go @@ -0,0 +1,58 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/rhobs/rhobs.go +// +// Generated by this command: +// +// mockgen -source=pkg/rhobs/rhobs.go -destination=pkg/rhobs/mock/rhobsmock.go -package=mock +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + time "time" + + rhobs "github.com/openshift/configuration-anomaly-detection/pkg/rhobs" + gomock "go.uber.org/mock/gomock" +) + +// MockClient is a mock of Client interface. +type MockClient struct { + ctrl *gomock.Controller + recorder *MockClientMockRecorder + isgomock struct{} +} + +// MockClientMockRecorder is the mock recorder for MockClient. +type MockClientMockRecorder struct { + mock *MockClient +} + +// NewMockClient creates a new mock instance. +func NewMockClient(ctrl *gomock.Controller) *MockClient { + mock := &MockClient{ctrl: ctrl} + mock.recorder = &MockClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockClient) EXPECT() *MockClientMockRecorder { + return m.recorder +} + +// QueryLogs mocks base method. +func (m *MockClient) QueryLogs(ctx context.Context, logQLQuery string, start, end time.Time, limit int) (*rhobs.LogQueryResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "QueryLogs", ctx, logQLQuery, start, end, limit) + ret0, _ := ret[0].(*rhobs.LogQueryResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// QueryLogs indicates an expected call of QueryLogs. +func (mr *MockClientMockRecorder) QueryLogs(ctx, logQLQuery, start, end, limit any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryLogs", reflect.TypeOf((*MockClient)(nil).QueryLogs), ctx, logQLQuery, start, end, limit) +} diff --git a/pkg/rhobs/rhobs.go b/pkg/rhobs/rhobs.go new file mode 100644 index 00000000..cc4e3abf --- /dev/null +++ b/pkg/rhobs/rhobs.go @@ -0,0 +1,170 @@ +// Package rhobs provides a client for querying RHOBS Grafana Loki API +package rhobs + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "time" +) + +// Client provides methods for interacting with RHOBS Grafana Loki API +type Client interface { + // QueryLogs queries Loki for logs matching the given LogQL query within the specified time range + QueryLogs(ctx context.Context, logQLQuery string, start, end time.Time, limit int) (*LogQueryResult, error) +} + +// ClientImpl implements the Client interface +type ClientImpl struct { + httpClient *http.Client + baseURL string + token string +} + +// Config contains configuration for creating a new RHOBS client +type Config struct { + BaseURL string + Token string +} + +// NewClient creates a new RHOBS client for querying Grafana Loki +func NewClient(config Config) (Client, error) { + if config.BaseURL == "" { + return nil, fmt.Errorf("BaseURL is required") + } + if config.Token == "" { + return nil, fmt.Errorf("token is required") + } + + return &ClientImpl{ + httpClient: &http.Client{ + Timeout: 30 * time.Second, + }, + baseURL: config.BaseURL, + token: config.Token, + }, nil +} + +// QueryLogs queries Loki for logs matching the given LogQL query within the specified time range +func (c *ClientImpl) QueryLogs(ctx context.Context, logQLQuery string, start, end time.Time, limit int) (*LogQueryResult, error) { + // Build query parameters + params := url.Values{} + params.Add("query", logQLQuery) + params.Add("start", strconv.FormatInt(start.UnixNano(), 10)) + params.Add("end", strconv.FormatInt(end.UnixNano(), 10)) + if limit > 0 { + params.Add("limit", strconv.Itoa(limit)) + } + + // Construct URL + queryURL := fmt.Sprintf("%s/loki/api/v1/query_range?%s", c.baseURL, params.Encode()) + + // Create request + req, err := http.NewRequestWithContext(ctx, http.MethodGet, queryURL, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + // Add authentication header + req.Header.Set("Authorization", "Bearer "+c.token) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "configuration-anomaly-detection") + + // Execute request + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to execute request: %w", err) + } + defer func() { + _ = resp.Body.Close() + }() + + // Read response body + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + // Check HTTP status + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(body)) + } + + // Parse response + var queryResp QueryRangeResponse + if err := json.Unmarshal(body, &queryResp); err != nil { + return nil, fmt.Errorf("failed to unmarshal response: %w", err) + } + + if queryResp.Status != "success" { + return nil, fmt.Errorf("query failed with status: %s", queryResp.Status) + } + + // Parse and format results + result := parseQueryResponse(&queryResp) + return result, nil +} + +// parseQueryResponse converts the Loki API response into a structured LogQueryResult +func parseQueryResponse(resp *QueryRangeResponse) *LogQueryResult { + result := &LogQueryResult{ + Entries: make([]LogEntry, 0), + StreamCount: len(resp.Data.Result), + } + + for _, stream := range resp.Data.Result { + for _, value := range stream.Values { + if len(value) < 2 { + continue + } + + // Parse timestamp (nanoseconds since epoch) + timestampNano, err := strconv.ParseInt(value[0], 10, 64) + if err != nil { + continue + } + timestamp := time.Unix(0, timestampNano) + + // Create log entry + entry := LogEntry{ + Timestamp: timestamp, + Line: value[1], + Labels: stream.Stream, + } + result.Entries = append(result.Entries, entry) + } + } + + result.TotalLines = len(result.Entries) + return result +} + +// FormatLogsForDisplay formats log entries into a human-readable string +func FormatLogsForDisplay(result *LogQueryResult, maxLines int) string { + if result == nil || len(result.Entries) == 0 { + return "No logs found" + } + + var output string + output += fmt.Sprintf("Found %d log entries from %d streams\n\n", result.TotalLines, result.StreamCount) + + displayCount := len(result.Entries) + if maxLines > 0 && displayCount > maxLines { + displayCount = maxLines + } + + for i := 0; i < displayCount; i++ { + entry := result.Entries[i] + output += fmt.Sprintf("[%s] %s\n", entry.Timestamp.Format(time.RFC3339), entry.Line) + } + + if len(result.Entries) > displayCount { + output += fmt.Sprintf("\n... and %d more lines (truncated for display)\n", len(result.Entries)-displayCount) + } + + return output +} diff --git a/pkg/rhobs/rhobs_test.go b/pkg/rhobs/rhobs_test.go new file mode 100644 index 00000000..9e708f76 --- /dev/null +++ b/pkg/rhobs/rhobs_test.go @@ -0,0 +1,376 @@ +package rhobs + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewClient(t *testing.T) { + tests := []struct { + name string + config Config + expectError bool + errorMsg string + }{ + { + name: "valid config", + config: Config{ + BaseURL: "https://grafana.example.com", + Token: "test-token", + }, + expectError: false, + }, + { + name: "missing base URL", + config: Config{ + Token: "test-token", + }, + expectError: true, + errorMsg: "BaseURL is required", + }, + { + name: "missing token", + config: Config{ + BaseURL: "https://grafana.example.com", + }, + expectError: true, + errorMsg: "token is required", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client, err := NewClient(tt.config) + + if tt.expectError { + assert.Error(t, err) + assert.Contains(t, err.Error(), tt.errorMsg) + assert.Nil(t, client) + } else { + assert.NoError(t, err) + assert.NotNil(t, client) + } + }) + } +} + +func TestQueryLogs_Success(t *testing.T) { + // Create a test server that returns mock Loki response + mockResponse := QueryRangeResponse{ + Status: "success", + Data: QueryRangeResult{ + ResultType: "streams", + Result: []Stream{ + { + Stream: map[string]string{ + "kubernetes_namespace_name": "test-namespace", + "kubernetes_pod_name": "test-pod-123", + }, + Values: [][]string{ + {"1704067200000000000", "First log line"}, + {"1704067201000000000", "Second log line"}, + {"1704067202000000000", "Third log line"}, + }, + }, + }, + }, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Verify request + assert.Equal(t, "GET", r.Method) + assert.Equal(t, "/loki/api/v1/query_range", r.URL.Path) + assert.Equal(t, "Bearer test-token", r.Header.Get("Authorization")) + + // Verify query parameters + query := r.URL.Query() + assert.NotEmpty(t, query.Get("query")) + assert.NotEmpty(t, query.Get("start")) + assert.NotEmpty(t, query.Get("end")) + assert.Equal(t, "100", query.Get("limit")) + + // Return mock response + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(mockResponse) + })) + defer server.Close() + + client, err := NewClient(Config{ + BaseURL: server.URL, + Token: "test-token", + }) + require.NoError(t, err) + + ctx := context.Background() + end := time.Now() + start := end.Add(-30 * time.Minute) + + result, err := client.QueryLogs(ctx, `{namespace="test"}`, start, end, 100) + + assert.NoError(t, err) + assert.NotNil(t, result) + assert.Equal(t, 3, result.TotalLines) + assert.Equal(t, 1, result.StreamCount) + assert.Len(t, result.Entries, 3) + assert.Equal(t, "First log line", result.Entries[0].Line) + assert.Equal(t, "Second log line", result.Entries[1].Line) + assert.Equal(t, "Third log line", result.Entries[2].Line) +} + +func TestQueryLogs_HTTPError(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusUnauthorized) + _, _ = w.Write([]byte("Unauthorized")) + })) + defer server.Close() + + client, err := NewClient(Config{ + BaseURL: server.URL, + Token: "invalid-token", + }) + require.NoError(t, err) + + ctx := context.Background() + end := time.Now() + start := end.Add(-30 * time.Minute) + + result, err := client.QueryLogs(ctx, `{namespace="test"}`, start, end, 100) + + assert.Error(t, err) + assert.Nil(t, result) + assert.Contains(t, err.Error(), "401") +} + +func TestQueryLogs_InvalidJSON(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("invalid json")) + })) + defer server.Close() + + client, err := NewClient(Config{ + BaseURL: server.URL, + Token: "test-token", + }) + require.NoError(t, err) + + ctx := context.Background() + end := time.Now() + start := end.Add(-30 * time.Minute) + + result, err := client.QueryLogs(ctx, `{namespace="test"}`, start, end, 100) + + assert.Error(t, err) + assert.Nil(t, result) + assert.Contains(t, err.Error(), "failed to unmarshal response") +} + +func TestQueryLogs_FailedStatus(t *testing.T) { + mockResponse := QueryRangeResponse{ + Status: "error", + Data: QueryRangeResult{}, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(mockResponse) + })) + defer server.Close() + + client, err := NewClient(Config{ + BaseURL: server.URL, + Token: "test-token", + }) + require.NoError(t, err) + + ctx := context.Background() + end := time.Now() + start := end.Add(-30 * time.Minute) + + result, err := client.QueryLogs(ctx, `{namespace="test"}`, start, end, 100) + + assert.Error(t, err) + assert.Nil(t, result) + assert.Contains(t, err.Error(), "query failed with status: error") +} + +func TestQueryLogs_EmptyResult(t *testing.T) { + mockResponse := QueryRangeResponse{ + Status: "success", + Data: QueryRangeResult{ + ResultType: "streams", + Result: []Stream{}, + }, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(mockResponse) + })) + defer server.Close() + + client, err := NewClient(Config{ + BaseURL: server.URL, + Token: "test-token", + }) + require.NoError(t, err) + + ctx := context.Background() + end := time.Now() + start := end.Add(-30 * time.Minute) + + result, err := client.QueryLogs(ctx, `{namespace="test"}`, start, end, 100) + + assert.NoError(t, err) + assert.NotNil(t, result) + assert.Equal(t, 0, result.TotalLines) + assert.Equal(t, 0, result.StreamCount) +} + +func TestFormatLogsForDisplay_NoLogs(t *testing.T) { + result := &LogQueryResult{ + Entries: []LogEntry{}, + TotalLines: 0, + StreamCount: 0, + } + + output := FormatLogsForDisplay(result, 10) + assert.Equal(t, "No logs found", output) +} + +func TestFormatLogsForDisplay_WithLogs(t *testing.T) { + timestamp := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + result := &LogQueryResult{ + Entries: []LogEntry{ + {Timestamp: timestamp, Line: "Log line 1"}, + {Timestamp: timestamp.Add(time.Second), Line: "Log line 2"}, + {Timestamp: timestamp.Add(2 * time.Second), Line: "Log line 3"}, + }, + TotalLines: 3, + StreamCount: 1, + } + + output := FormatLogsForDisplay(result, 10) + assert.Contains(t, output, "Found 3 log entries from 1 streams") + assert.Contains(t, output, "Log line 1") + assert.Contains(t, output, "Log line 2") + assert.Contains(t, output, "Log line 3") +} + +func TestFormatLogsForDisplay_Truncation(t *testing.T) { + timestamp := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + entries := make([]LogEntry, 10) + for i := 0; i < 10; i++ { + entries[i] = LogEntry{ + Timestamp: timestamp.Add(time.Duration(i) * time.Second), + Line: "Log line", + } + } + + result := &LogQueryResult{ + Entries: entries, + TotalLines: 10, + StreamCount: 1, + } + + output := FormatLogsForDisplay(result, 5) + assert.Contains(t, output, "Found 10 log entries from 1 streams") + assert.Contains(t, output, "... and 5 more lines (truncated for display)") +} + +func TestFormatLogsForDisplay_NilResult(t *testing.T) { + output := FormatLogsForDisplay(nil, 10) + assert.Equal(t, "No logs found", output) +} + +func TestParseQueryResponse_MultipleStreams(t *testing.T) { + resp := &QueryRangeResponse{ + Status: "success", + Data: QueryRangeResult{ + ResultType: "streams", + Result: []Stream{ + { + Stream: map[string]string{"pod": "pod1"}, + Values: [][]string{ + {"1704067200000000000", "Pod 1 log line"}, + }, + }, + { + Stream: map[string]string{"pod": "pod2"}, + Values: [][]string{ + {"1704067201000000000", "Pod 2 log line"}, + }, + }, + }, + }, + } + + result := parseQueryResponse(resp) + + assert.Equal(t, 2, result.TotalLines) + assert.Equal(t, 2, result.StreamCount) + assert.Len(t, result.Entries, 2) + assert.Equal(t, "Pod 1 log line", result.Entries[0].Line) + assert.Equal(t, "Pod 2 log line", result.Entries[1].Line) +} + +func TestParseQueryResponse_InvalidTimestamp(t *testing.T) { + resp := &QueryRangeResponse{ + Status: "success", + Data: QueryRangeResult{ + ResultType: "streams", + Result: []Stream{ + { + Stream: map[string]string{"pod": "pod1"}, + Values: [][]string{ + {"invalid-timestamp", "This should be skipped"}, + {"1704067200000000000", "This should be included"}, + }, + }, + }, + }, + } + + result := parseQueryResponse(resp) + + // Only one entry should be parsed successfully + assert.Equal(t, 1, result.TotalLines) + assert.Len(t, result.Entries, 1) + assert.Equal(t, "This should be included", result.Entries[0].Line) +} + +func TestParseQueryResponse_MalformedValues(t *testing.T) { + resp := &QueryRangeResponse{ + Status: "success", + Data: QueryRangeResult{ + ResultType: "streams", + Result: []Stream{ + { + Stream: map[string]string{"pod": "pod1"}, + Values: [][]string{ + {"1704067200000000000"}, // Missing log line + {"1704067201000000000", "Valid log line"}, + }, + }, + }, + }, + } + + result := parseQueryResponse(resp) + + // Only the valid entry should be parsed + assert.Equal(t, 1, result.TotalLines) + assert.Len(t, result.Entries, 1) + assert.Equal(t, "Valid log line", result.Entries[0].Line) +} diff --git a/pkg/rhobs/types.go b/pkg/rhobs/types.go new file mode 100644 index 00000000..289a7578 --- /dev/null +++ b/pkg/rhobs/types.go @@ -0,0 +1,51 @@ +// Package rhobs provides a client for querying RHOBS Grafana Loki API +package rhobs + +import "time" + +// QueryRangeResponse represents the response from Loki's /loki/api/v1/query_range endpoint +type QueryRangeResponse struct { + Status string `json:"status"` + Data QueryRangeResult `json:"data"` +} + +// QueryRangeResult contains the result data from a Loki query +type QueryRangeResult struct { + ResultType string `json:"resultType"` + Result []Stream `json:"result"` + Stats Stats `json:"stats,omitempty"` +} + +// Stream represents a log stream with its labels and values +type Stream struct { + Stream map[string]string `json:"stream"` + Values [][]string `json:"values"` +} + +// Stats contains query statistics +type Stats struct { + Summary Summary `json:"summary,omitempty"` +} + +// Summary contains summary statistics +type Summary struct { + BytesProcessedPerSecond int `json:"bytesProcessedPerSecond,omitempty"` + LinesProcessedPerSecond int `json:"linesProcessedPerSecond,omitempty"` + TotalBytesProcessed int `json:"totalBytesProcessed,omitempty"` + TotalLinesProcessed int `json:"totalLinesProcessed,omitempty"` + ExecTime float64 `json:"execTime,omitempty"` +} + +// LogEntry represents a single log entry with timestamp and line +type LogEntry struct { + Timestamp time.Time + Line string + Labels map[string]string +} + +// LogQueryResult represents the parsed result of a log query +type LogQueryResult struct { + Entries []LogEntry + TotalLines int + StreamCount int +} diff --git a/test/set_stage_env.sh b/test/set_stage_env.sh index fe07df79..3145b4bc 100755 --- a/test/set_stage_env.sh +++ b/test/set_stage_env.sh @@ -6,6 +6,7 @@ export VAULT_TOKEN="$(vault login -method=oidc -token-only)" for v in $(vault kv get -format=json osd-sre/configuration-anomaly-detection/backplane/stg | jq -r ".data.data|to_entries|map(\"\(.key)=\(.value|tostring)\")|.[]"); do export $v; done for v in $(vault kv get -format=json osd-sre/configuration-anomaly-detection/ocm/ocm-cad-staging | jq -r ".data.data|to_entries|map(\"\(.key)=\(.value|tostring)\")|.[]"); do export $v; done for v in $(vault kv get -format=json osd-sre/configuration-anomaly-detection/pd/stg | jq -r ".data.data|to_entries|map(\"\(.key)=\(.value|tostring)\")|.[]"); do export $v; done +for v in $(vault kv get -format=json osd-sre/configuration-anomaly-detection/grafana/stg | jq -r ".data.data|to_entries|map(\"\(.key)=\(.value|tostring)\")|.[]"); do export $v; done unset VAULT_ADDR VAULT_TOKEN From 91037e5c2e52709735843613bda532314bc1d08e Mon Sep 17 00:00:00 2001 From: Alex Smith Date: Wed, 8 Apr 2026 13:59:08 -0400 Subject: [PATCH 3/6] Update rhobsclient to be available in ResourceBuilder interface --- .../investigation/investigation.go | 100 ++++++++++++++---- 1 file changed, 82 insertions(+), 18 deletions(-) diff --git a/pkg/investigations/investigation/investigation.go b/pkg/investigations/investigation/investigation.go index 38057d15..a04bf60f 100644 --- a/pkg/investigations/investigation/investigation.go +++ b/pkg/investigations/investigation/investigation.go @@ -18,6 +18,7 @@ import ( "github.com/openshift/configuration-anomaly-detection/pkg/oc" "github.com/openshift/configuration-anomaly-detection/pkg/ocm" "github.com/openshift/configuration-anomaly-detection/pkg/pagerduty" + "github.com/openshift/configuration-anomaly-detection/pkg/rhobs" "github.com/openshift/configuration-anomaly-detection/pkg/types" ) @@ -92,6 +93,7 @@ type Resources struct { ManagementRestConfig *backplane.RestConfig ManagementK8sClient k8sclient.Client ManagementOCClient oc.Client + RHOBSClient rhobs.Client HCPNamespace string HCNamespace string IsHCP bool @@ -113,6 +115,7 @@ type ResourceBuilder interface { WithManagementRestConfig() ResourceBuilder WithManagementK8sClient() ResourceBuilder WithManagementOCClient() ResourceBuilder + WithRHOBSClient() ResourceBuilder Build() (*Resources, error) } @@ -127,6 +130,7 @@ type ResourceBuilderT struct { buildManagementRestConfig bool buildManagementK8sClient bool buildManagementOCClient bool + buildRHOBSClient bool clusterId string name string @@ -205,6 +209,12 @@ func (r *ResourceBuilderT) WithManagementOCClient() ResourceBuilder { return r } +func (r *ResourceBuilderT) WithRHOBSClient() ResourceBuilder { + r.WithManagementRestConfig() + r.buildRHOBSClient = true + return r +} + func (r *ResourceBuilderT) Build() (*Resources, error) { if r.buildErr != nil { // Return whatever managed to build + an error. this might allow some subset of checks to proceed. @@ -216,26 +226,13 @@ func (r *ResourceBuilderT) Build() (*Resources, error) { var err error - if r.buildCluster && r.builtResources.Cluster == nil { - r.builtResources.Cluster, err = r.ocmClient.GetClusterInfo(r.clusterId) + // Build cluster resource if requested + if r.buildCluster { + err = r.buildClusterResource() if err != nil { - // Let the caller handle how to respond to this error. - r.buildErr = ClusterNotFoundError{ClusterID: r.clusterId, Err: err} + r.buildErr = err return r.builtResources, r.buildErr } - - // Check if this is an infra cluster (hive, management or service) - internalID := r.builtResources.Cluster.ID() - isManaging, err := r.ocmClient.IsManagingCluster(internalID) - if err != nil { - logging.Warnf("Failed to check if cluster %s is a managing cluster: %v. Assuming it IS a managing cluster (fail-closed).", internalID, err) - r.builtResources.IsInfrastructureCluster = true - } else { - r.builtResources.IsInfrastructureCluster = isManaging - if isManaging { - logging.Infof("Cluster %s is an infrastructure cluster (hive, management, or service cluster)", internalID) - } - } } if r.buildNotes && r.builtResources.Notes == nil { @@ -286,7 +283,7 @@ func (r *ResourceBuilderT) Build() (*Resources, error) { } // Check if this is an HCP cluster and build management cluster resources if requested - if r.buildManagementRestConfig || r.buildManagementOCClient || r.buildManagementK8sClient { + if r.buildManagementRestConfig || r.buildManagementOCClient || r.buildManagementK8sClient || r.buildRHOBSClient { err = r.buildManagementClusterResources() if err != nil { r.buildErr = err @@ -294,6 +291,15 @@ func (r *ResourceBuilderT) Build() (*Resources, error) { } } + // Build RHOBS client if requested + if r.buildRHOBSClient { + err = r.buildRHOBSClientResource() + if err != nil { + r.buildErr = err + return r.builtResources, r.buildErr + } + } + return r.builtResources, nil } @@ -394,6 +400,60 @@ func (r *ResourceBuilderT) buildManagementClusterResources() error { return nil } +// buildClusterResource fetches cluster information and determines if it's an infrastructure cluster +func (r *ResourceBuilderT) buildClusterResource() error { + if r.builtResources.Cluster != nil { + return nil + } + + cluster, err := r.ocmClient.GetClusterInfo(r.clusterId) + if err != nil { + return ClusterNotFoundError{ClusterID: r.clusterId, Err: err} + } + r.builtResources.Cluster = cluster + + // Check if this is an infra cluster (hive, management or service) + internalID := cluster.ID() + isManaging, err := r.ocmClient.IsManagingCluster(internalID) + if err != nil { + logging.Warnf("Failed to check if cluster %s is a managing cluster: %v. Assuming it IS a managing cluster (fail-closed).", internalID, err) + r.builtResources.IsInfrastructureCluster = true + } else { + r.builtResources.IsInfrastructureCluster = isManaging + if isManaging { + logging.Infof("Cluster %s is an infrastructure cluster (hive, management, or service cluster)", internalID) + } + } + + return nil +} + +// buildRHOBSClientResource creates a RHOBS client for fetching logs from RHOBS/Loki +func (r *ResourceBuilderT) buildRHOBSClientResource() error { + if r.builtResources.RHOBSClient != nil { + return nil + } + + if r.builtResources.RHOBSCell == "" { + return fmt.Errorf("RHOBS cell endpoint not available") + } + if r.grafanaToken == "" { + return fmt.Errorf("grafana token not available") + } + + logging.Infof("Creating RHOBS client for cell: %s", r.builtResources.RHOBSCell) + client, err := rhobs.NewClient(rhobs.Config{ + BaseURL: r.builtResources.RHOBSCell, + Token: r.grafanaToken, + }) + if err != nil { + return fmt.Errorf("failed to create RHOBS client: %w", err) + } + + r.builtResources.RHOBSClient = client + return nil +} + // This is an implementation to be used in tests, but putting it into a _test.go file will make it not resolvable. type ResourceBuilderMock struct { Resources *Resources @@ -445,6 +505,10 @@ func (r *ResourceBuilderMock) WithManagementOCClient() ResourceBuilder { return r } +func (r *ResourceBuilderMock) WithRHOBSClient() ResourceBuilder { + return r +} + func (r *ResourceBuilderMock) Build() (*Resources, error) { if r.BuildError != nil { return nil, r.BuildError From 649d86940e08360e8e05947cda663a78651771bb Mon Sep 17 00:00:00 2001 From: Alex Smith Date: Wed, 8 Apr 2026 13:59:25 -0400 Subject: [PATCH 4/6] Implement ResourceBuilder rhobsclient interface --- .../etcddatabasequotalowspace.go | 27 ++++--------------- .../etcddatabasequotalowspace_test.go | 16 ++--------- 2 files changed, 7 insertions(+), 36 deletions(-) diff --git a/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace.go b/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace.go index 982ba2e5..7fc62a1f 100644 --- a/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace.go +++ b/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace.go @@ -27,15 +27,6 @@ const ( etcdctlInitContainer = "reset-member" ) -// rhobsClientFactory is a factory function for creating RHOBS clients -// Can be overridden in tests for dependency injection -var rhobsClientFactory = func(baseURL, token string) (rhobs.Client, error) { - return rhobs.NewClient(rhobs.Config{ - BaseURL: baseURL, - Token: token, - }) -} - type Investigation struct{} // SnapshotResult contains information about the etcd snapshot that was taken @@ -238,6 +229,7 @@ func (i *Investigation) runHCPEtcdAnalysis(ctx context.Context, rb investigation r, err := rb. WithManagementRestConfig(). WithManagementK8sClient(). + WithRHOBSClient(). Build() if err != nil { if msg, ok := investigation.ClusterAccessErrorMessage(err); ok { @@ -613,22 +605,13 @@ func buildRHOBSLogsURL(rhobsCell, namespace, jobId string) string { // fetchRHOBSLogs fetches logs from RHOBS Grafana Loki for the analysis job func fetchRHOBSLogs(ctx context.Context, r *investigation.Resources, namespace, jobName string) (string, error) { - // Validate prerequisites - if r.RHOBSCell == "" { - return "", fmt.Errorf("RHOBS cell endpoint not available") - } - if r.GrafanaToken == "" { - return "", fmt.Errorf("grafana token not available") + // Validate RHOBS client is available (built by resource builder) + if r.RHOBSClient == nil { + return "", fmt.Errorf("RHOBS client not available") } logging.Infof("Fetching logs from RHOBS for job %s in namespace %s", jobName, namespace) - // Create RHOBS client using factory (allows dependency injection in tests) - rhobsClient, err := rhobsClientFactory(fmt.Sprintf("https://%s", r.RHOBSCell), r.GrafanaToken) - if err != nil { - return "", fmt.Errorf("failed to create RHOBS client: %w", err) - } - // Build LogQL query to filter logs for the specific namespace and job logQLQuery := fmt.Sprintf(`{kubernetes_namespace_name="%s", kubernetes_pod_name=~"%s.*"}`, namespace, jobName) @@ -638,7 +621,7 @@ func fetchRHOBSLogs(ctx context.Context, r *investigation.Resources, namespace, logging.Debugf("Querying RHOBS with LogQL: %s (time range: %s to %s)", logQLQuery, start.Format(time.RFC3339), now.Format(time.RFC3339)) - result, err := rhobsClient.QueryLogs(ctx, logQLQuery, start, now, 1000) + result, err := r.RHOBSClient.QueryLogs(ctx, logQLQuery, start, now, 1000) if err != nil { return "", fmt.Errorf("failed to query RHOBS logs: %w", err) } diff --git a/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace_test.go b/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace_test.go index 3375bb35..8fe73dc9 100644 --- a/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace_test.go +++ b/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace_test.go @@ -301,13 +301,6 @@ func TestRunHCPEtcdAnalysis_Success(t *testing.T) { Return(mockLogResult, nil). Times(1) - // Override the factory to return our mock - originalFactory := rhobsClientFactory - rhobsClientFactory = func(baseURL, token string) (rhobs.Client, error) { - return mockRHOBSClient, nil - } - defer func() { rhobsClientFactory = originalFactory }() - rb := &investigation.ResourceBuilderMock{ Resources: &investigation.Resources{ Cluster: cluster, @@ -315,6 +308,7 @@ func TestRunHCPEtcdAnalysis_Success(t *testing.T) { HCPNamespace: "ocm-test-namespace", ManagementClusterName: "test-management-cluster", RHOBSCell: "grafana.rhobs.example.com", + RHOBSClient: mockRHOBSClient, GrafanaToken: "test-token", Notes: notewriter.New("etcddatabasequotalowspace_test", logging.RawLogger), }, @@ -409,13 +403,6 @@ func TestRunHCPEtcdAnalysis_RHOBSFetchFailure(t *testing.T) { Return(nil, assert.AnError). Times(1) - // Override the factory to return our mock - originalFactory := rhobsClientFactory - rhobsClientFactory = func(baseURL, token string) (rhobs.Client, error) { - return mockRHOBSClient, nil - } - defer func() { rhobsClientFactory = originalFactory }() - rb := &investigation.ResourceBuilderMock{ Resources: &investigation.Resources{ Cluster: cluster, @@ -423,6 +410,7 @@ func TestRunHCPEtcdAnalysis_RHOBSFetchFailure(t *testing.T) { HCPNamespace: "ocm-test-namespace", ManagementClusterName: "test-management-cluster", RHOBSCell: "grafana.rhobs.example.com", + RHOBSClient: mockRHOBSClient, GrafanaToken: "test-token", Notes: notewriter.New("etcddatabasequotalowspace_test", logging.RawLogger), }, From 5ee72cf08fc9f6766c1fd3a831f1fd18dd09563c Mon Sep 17 00:00:00 2001 From: Alex Smith Date: Wed, 8 Apr 2026 14:08:00 -0400 Subject: [PATCH 5/6] Improvements --- .../investigation/investigation.go | 39 ++++++++++--------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/pkg/investigations/investigation/investigation.go b/pkg/investigations/investigation/investigation.go index a04bf60f..d7dcd157 100644 --- a/pkg/investigations/investigation/investigation.go +++ b/pkg/investigations/investigation/investigation.go @@ -378,25 +378,11 @@ func (r *ResourceBuilderT) buildManagementClusterResources() error { managementClusterName := hypershiftConfig.ManagementCluster() if managementClusterName == "" { - logging.Warnf("Management cluster name is empty, cannot fetch RHOBS cell") + logging.Warnf("Management cluster name is empty") return nil } r.builtResources.ManagementClusterName = managementClusterName - - managementCluster, err := r.ocmClient.GetClusterInfo(managementClusterName) - if err != nil { - logging.Warnf("Failed to get management cluster info for RHOBS cell: %v", err) - return nil - } - - rhobsCell, err := r.ocmClient.GetRHOBSCell(managementCluster.ID()) - if err != nil { - logging.Warnf("Failed to get RHOBS cell: %v", err) - return nil - } - - r.builtResources.RHOBSCell = rhobsCell return nil } @@ -434,13 +420,30 @@ func (r *ResourceBuilderT) buildRHOBSClientResource() error { return nil } - if r.builtResources.RHOBSCell == "" { - return fmt.Errorf("RHOBS cell endpoint not available") - } if r.grafanaToken == "" { return fmt.Errorf("grafana token not available") } + // Fetch RHOBS cell if not already set + if r.builtResources.RHOBSCell == "" { + if r.builtResources.ManagementClusterName == "" { + return fmt.Errorf("management cluster name not available - cannot determine RHOBS cell") + } + + logging.Infof("Fetching RHOBS cell for management cluster: %s", r.builtResources.ManagementClusterName) + managementCluster, err := r.ocmClient.GetClusterInfo(r.builtResources.ManagementClusterName) + if err != nil { + return fmt.Errorf("failed to get management cluster info for RHOBS cell: %w", err) + } + + rhobsCell, err := r.ocmClient.GetRHOBSCell(managementCluster.ID()) + if err != nil { + return fmt.Errorf("failed to get RHOBS cell: %w", err) + } + + r.builtResources.RHOBSCell = rhobsCell + } + logging.Infof("Creating RHOBS client for cell: %s", r.builtResources.RHOBSCell) client, err := rhobs.NewClient(rhobs.Config{ BaseURL: r.builtResources.RHOBSCell, From 5fbcfd165495eb38f9420ee7c37e4625ffaaf290 Mon Sep 17 00:00:00 2001 From: Alex Smith Date: Mon, 13 Apr 2026 14:52:13 -0400 Subject: [PATCH 6/6] Cleanup --- .../etcddatabasequotalowspace.go | 10 ----- .../investigation/investigation.go | 5 --- pkg/rhobs/rhobs.go | 42 ++++++++----------- 3 files changed, 17 insertions(+), 40 deletions(-) diff --git a/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace.go b/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace.go index 7fc62a1f..2311cfc0 100644 --- a/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace.go +++ b/pkg/investigations/etcddatabasequotalowspace/etcddatabasequotalowspace.go @@ -588,13 +588,8 @@ func getEtcdctlContainerImage(pod *corev1.Pod) (string, error) { // buildRHOBSLogsURL constructs a Grafana/RHOBS explore URL with a LogQL query for the analysis job logs func buildRHOBSLogsURL(rhobsCell, namespace, jobId string) string { - // Build LogQL query to filter logs for the specific namespace and job - // LogQL syntax: {namespace="...", pod=~"..."} logQLQuery := fmt.Sprintf(`{kubernetes_namespace_name="%s", kubernetes_pod_name=~"%s.*"}`, namespace, jobId) - // Build Grafana explore URL with LogQL query - // Using relative time range of 30 minutes - // left parameter contains the datasource and query details leftParam := url.QueryEscape(fmt.Sprintf( `{"datasource":"Loki","queries":[{"refId":"A","expr":"%s","queryType":"range"}],"range":{"from":"now-30m","to":"now"}}`, logQLQuery, @@ -605,17 +600,14 @@ func buildRHOBSLogsURL(rhobsCell, namespace, jobId string) string { // fetchRHOBSLogs fetches logs from RHOBS Grafana Loki for the analysis job func fetchRHOBSLogs(ctx context.Context, r *investigation.Resources, namespace, jobName string) (string, error) { - // Validate RHOBS client is available (built by resource builder) if r.RHOBSClient == nil { return "", fmt.Errorf("RHOBS client not available") } logging.Infof("Fetching logs from RHOBS for job %s in namespace %s", jobName, namespace) - // Build LogQL query to filter logs for the specific namespace and job logQLQuery := fmt.Sprintf(`{kubernetes_namespace_name="%s", kubernetes_pod_name=~"%s.*"}`, namespace, jobName) - // Query logs from the last 30 minutes now := time.Now() start := now.Add(-30 * time.Minute) @@ -633,10 +625,8 @@ func fetchRHOBSLogs(ctx context.Context, r *investigation.Resources, namespace, logging.Infof("Successfully fetched %d log lines from RHOBS", result.TotalLines) - // Format logs for display (limit to 100 lines for readability) formattedLogs := rhobs.FormatLogsForDisplay(result, 100) - // Also include the Grafana explore URL for reference exploreURL := buildRHOBSLogsURL(r.RHOBSCell, namespace, jobName) formattedLogs += fmt.Sprintf("\n\nView full logs in Grafana: %s", exploreURL) diff --git a/pkg/investigations/investigation/investigation.go b/pkg/investigations/investigation/investigation.go index d7dcd157..314ffc84 100644 --- a/pkg/investigations/investigation/investigation.go +++ b/pkg/investigations/investigation/investigation.go @@ -420,11 +420,6 @@ func (r *ResourceBuilderT) buildRHOBSClientResource() error { return nil } - if r.grafanaToken == "" { - return fmt.Errorf("grafana token not available") - } - - // Fetch RHOBS cell if not already set if r.builtResources.RHOBSCell == "" { if r.builtResources.ManagementClusterName == "" { return fmt.Errorf("management cluster name not available - cannot determine RHOBS cell") diff --git a/pkg/rhobs/rhobs.go b/pkg/rhobs/rhobs.go index cc4e3abf..a3159d26 100644 --- a/pkg/rhobs/rhobs.go +++ b/pkg/rhobs/rhobs.go @@ -1,4 +1,3 @@ -// Package rhobs provides a client for querying RHOBS Grafana Loki API package rhobs import ( @@ -9,29 +8,30 @@ import ( "net/http" "net/url" "strconv" + "strings" "time" ) -// Client provides methods for interacting with RHOBS Grafana Loki API +const ( + defaultHTTPTimeout = 30 * time.Second + maxErrorBodyLength = 500 +) + type Client interface { - // QueryLogs queries Loki for logs matching the given LogQL query within the specified time range QueryLogs(ctx context.Context, logQLQuery string, start, end time.Time, limit int) (*LogQueryResult, error) } -// ClientImpl implements the Client interface type ClientImpl struct { httpClient *http.Client baseURL string token string } -// Config contains configuration for creating a new RHOBS client type Config struct { BaseURL string Token string } -// NewClient creates a new RHOBS client for querying Grafana Loki func NewClient(config Config) (Client, error) { if config.BaseURL == "" { return nil, fmt.Errorf("BaseURL is required") @@ -42,7 +42,7 @@ func NewClient(config Config) (Client, error) { return &ClientImpl{ httpClient: &http.Client{ - Timeout: 30 * time.Second, + Timeout: defaultHTTPTimeout, }, baseURL: config.BaseURL, token: config.Token, @@ -51,7 +51,6 @@ func NewClient(config Config) (Client, error) { // QueryLogs queries Loki for logs matching the given LogQL query within the specified time range func (c *ClientImpl) QueryLogs(ctx context.Context, logQLQuery string, start, end time.Time, limit int) (*LogQueryResult, error) { - // Build query parameters params := url.Values{} params.Add("query", logQLQuery) params.Add("start", strconv.FormatInt(start.UnixNano(), 10)) @@ -60,21 +59,17 @@ func (c *ClientImpl) QueryLogs(ctx context.Context, logQLQuery string, start, en params.Add("limit", strconv.Itoa(limit)) } - // Construct URL queryURL := fmt.Sprintf("%s/loki/api/v1/query_range?%s", c.baseURL, params.Encode()) - // Create request req, err := http.NewRequestWithContext(ctx, http.MethodGet, queryURL, nil) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } - // Add authentication header req.Header.Set("Authorization", "Bearer "+c.token) req.Header.Set("Content-Type", "application/json") req.Header.Set("User-Agent", "configuration-anomaly-detection") - // Execute request resp, err := c.httpClient.Do(req) if err != nil { return nil, fmt.Errorf("failed to execute request: %w", err) @@ -83,18 +78,19 @@ func (c *ClientImpl) QueryLogs(ctx context.Context, logQLQuery string, start, en _ = resp.Body.Close() }() - // Read response body body, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("failed to read response body: %w", err) } - // Check HTTP status if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(body)) + errorBody := string(body) + if len(errorBody) > maxErrorBodyLength { + errorBody = errorBody[:maxErrorBodyLength] + "..." + } + return nil, fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, errorBody) } - // Parse response var queryResp QueryRangeResponse if err := json.Unmarshal(body, &queryResp); err != nil { return nil, fmt.Errorf("failed to unmarshal response: %w", err) @@ -104,7 +100,6 @@ func (c *ClientImpl) QueryLogs(ctx context.Context, logQLQuery string, start, en return nil, fmt.Errorf("query failed with status: %s", queryResp.Status) } - // Parse and format results result := parseQueryResponse(&queryResp) return result, nil } @@ -122,14 +117,12 @@ func parseQueryResponse(resp *QueryRangeResponse) *LogQueryResult { continue } - // Parse timestamp (nanoseconds since epoch) timestampNano, err := strconv.ParseInt(value[0], 10, 64) if err != nil { continue } timestamp := time.Unix(0, timestampNano) - // Create log entry entry := LogEntry{ Timestamp: timestamp, Line: value[1], @@ -143,14 +136,13 @@ func parseQueryResponse(resp *QueryRangeResponse) *LogQueryResult { return result } -// FormatLogsForDisplay formats log entries into a human-readable string func FormatLogsForDisplay(result *LogQueryResult, maxLines int) string { if result == nil || len(result.Entries) == 0 { return "No logs found" } - var output string - output += fmt.Sprintf("Found %d log entries from %d streams\n\n", result.TotalLines, result.StreamCount) + var output strings.Builder + fmt.Fprintf(&output, "Found %d log entries from %d streams\n\n", result.TotalLines, result.StreamCount) displayCount := len(result.Entries) if maxLines > 0 && displayCount > maxLines { @@ -159,12 +151,12 @@ func FormatLogsForDisplay(result *LogQueryResult, maxLines int) string { for i := 0; i < displayCount; i++ { entry := result.Entries[i] - output += fmt.Sprintf("[%s] %s\n", entry.Timestamp.Format(time.RFC3339), entry.Line) + fmt.Fprintf(&output, "[%s] %s\n", entry.Timestamp.Format(time.RFC3339), entry.Line) } if len(result.Entries) > displayCount { - output += fmt.Sprintf("\n... and %d more lines (truncated for display)\n", len(result.Entries)-displayCount) + fmt.Fprintf(&output, "\n... and %d more lines (truncated for display)\n", len(result.Entries)-displayCount) } - return output + return output.String() }