From 2e56492dda10b9624e9005ab343df4a1aba3ef87 Mon Sep 17 00:00:00 2001 From: lucca Date: Wed, 3 Jun 2026 15:56:29 -0300 Subject: [PATCH 1/2] feat(scc): migrate to SCC v2 API and add new finding tools Upgrade client from securitycenter v1 to securitycenter_v2. Update all parent paths to include /locations/{location}. Add search_findings, get_finding_details, and search_findings_by_compliance tools. Fix attackExposure score to read from nested v2 message structure. Also add .DS_Store to .gitignore. --- .gitignore | 3 + server/scc/README.md | 37 ++++ server/scc/scc_mcp.py | 445 +++++++++++++++++++++++++++++++++++++++++- 3 files changed, 475 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index a614800b..c30927bf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# macOS +.DS_Store + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/server/scc/README.md b/server/scc/README.md index d7cdbe28..d684c61e 100644 --- a/server/scc/README.md +++ b/server/scc/README.md @@ -6,6 +6,43 @@ This is an MCP (Model Context Protocol) server for interacting with Google Cloud ### Available Tools +- **`search_findings(project_id, finding_class=None, severity=None, state="ACTIVE", category=None, ...)`** + - **Description**: Searches and lists ALL types of Security Command Center findings with flexible filtering. Returns full finding details including descriptions, remediation steps, severity, attack exposure, and all associated metadata. + - **Parameters**: + - `project_id` (required): The Google Cloud project ID (e.g., 'my-gcp-project'). + - `finding_class` (optional): Filter by finding class. Valid values: `VULNERABILITY`, `THREAT`, `MISCONFIGURATION`, `OBSERVATION`, `SCC_ERROR`, `POSTURE_VIOLATION`, `TOXIC_COMBINATION`, `SENSITIVE_DATA_RISK`, `CHOKEPOINT`. Supports OR (e.g., `'THREAT OR MISCONFIGURATION'`). + - `severity` (optional): Filter by severity: `CRITICAL`, `HIGH`, `MEDIUM`, `LOW`. Supports OR (e.g., `'HIGH OR CRITICAL'`). + - `state` (optional): Filter by state: `ACTIVE`, `INACTIVE`. Defaults to `ACTIVE`. Set to `None` for all states. + - `category` (optional): Filter by finding category (e.g., `PUBLIC_BUCKET_ACL`, `XSS`, `OPEN_FIREWALL`). + - `resource_name` (optional): Filter by the full resource name associated with the finding. + - `resource_type` (optional): Filter by resource type (e.g., `google.compute.Instance`). + - `mute` (optional): Filter by mute status: `MUTED`, `UNMUTED`, `UNDEFINED`. + - `custom_filter` (optional): Raw SCC filter string appended via AND for advanced filtering. + - `max_findings` (optional): Maximum number of findings to return. Defaults to 50. + - `location` (optional): Google Cloud location for SCC v2. Defaults to `global`. + - `order_by` (optional): Ordering of results. Defaults to `event_time desc`. + +- **`get_finding_details(project_id, finding_id, location="global", include_resource_details=True)`** + - **Description**: Gets the full details of a specific finding by its ID, including description, remediation steps, severity, attack exposure, compliance information, MITRE ATT&CK data, vulnerability details, and optionally the affected resource details from Cloud Asset Inventory (CAI). Works for any finding class. + - **Parameters**: + - `project_id` (required): The Google Cloud project ID. + - `finding_id` (required): The ID of the finding to retrieve. + - `location` (optional): Google Cloud location for SCC v2. Defaults to `global`. + - `include_resource_details` (optional): Whether to fetch resource details from CAI. Defaults to `True`. + +- **`search_findings_by_compliance(project_id, search_text=None, compliance_standard=None, compliance_version=None, compliance_id=None, ...)`** + - **Description**: Searches SCC findings by compliance framework information (CIS benchmarks, PCI DSS, NIST 800-53, ISO 27001, etc.) or by free-text search on finding descriptions and categories. Use when you have a compliance control name or description (e.g., 'ServiceAccount should not have Admin privileges') and want to find the corresponding SCC findings. + - **Parameters**: + - `project_id` (required): The Google Cloud project ID. + - `search_text` (optional): Free-text to search across finding descriptions, categories, and compliance standard names (case-insensitive). Examples: `'ServiceAccount should not have Admin privileges'`, `'log metric filter'`, `'MFA'`. + - `compliance_standard` (optional): Filter by compliance standard name (case-insensitive partial match). Examples: `'CIS'`, `'PCI DSS'`, `'NIST 800-53'`. + - `compliance_version` (optional): Filter by standard version (exact match). Examples: `'1.3.0'`, `'2.0'`. + - `compliance_id` (optional): Filter by control ID (exact match). Examples: `'1.5'`, `'4.1'`. + - `severity` (optional): Pre-filter by severity. Supports OR (e.g., `'HIGH OR CRITICAL'`). + - `state` (optional): Filter by state. Defaults to `ACTIVE`. + - `max_findings` (optional): Maximum findings to return. Defaults to 50. + - `location` (optional): Google Cloud location for SCC v2. Defaults to `global`. + - **`top_vulnerability_findings(project_id, max_findings=20)`** - **Description**: Lists the top ACTIVE, HIGH or CRITICAL severity findings of class VULNERABILITY for a specific project, sorted by Attack Exposure Score (descending). Includes the Attack Exposure score in the output if available. Aids prioritization for remediation. - **Parameters**: diff --git a/server/scc/scc_mcp.py b/server/scc/scc_mcp.py index 7a7cc123..efc2d7b3 100644 --- a/server/scc/scc_mcp.py +++ b/server/scc/scc_mcp.py @@ -17,7 +17,7 @@ from google.api_core import exceptions as google_exceptions from google.cloud import asset_v1 -from google.cloud import securitycenter +from google.cloud import securitycenter_v2 from google.protobuf import json_format from mcp.server.fastmcp import FastMCP @@ -41,8 +41,8 @@ # Ensure ADC are configured in the environment where the server runs # (e.g., by running `gcloud auth application-default login`). try: - scc_client = securitycenter.SecurityCenterClient() - logger.info("Successfully initialized Google Cloud Security Center Client.") + scc_client = securitycenter_v2.SecurityCenterClient() + logger.info("Successfully initialized Google Cloud Security Center v2 Client.") except Exception as e: logger.error(f"Failed to initialize Security Center Client: {e}", exc_info=True) # Depending on requirements, you might want to exit or prevent tool registration @@ -70,10 +70,429 @@ def proto_message_to_dict(message: Any) -> Dict[str, Any]: # --- Security Command Center Tools --- +@mcp.tool() +async def search_findings( + project_id: str, + finding_class: str = None, + severity: str = None, + state: str = "ACTIVE", + category: str = None, + resource_name: str = None, + resource_type: str = None, + mute: str = None, + custom_filter: str = None, + max_findings: int = 50, + location: str = "global", + order_by: str = "event_time desc", +) -> Dict[str, Any]: + """Name: search_findings + + Description: Searches and lists ALL types of Security Command Center findings for a specific project + with flexible filtering. Returns full finding details including descriptions, remediation + steps, severity, attack exposure, and all associated metadata. Supports filtering by + finding class (VULNERABILITY, THREAT, MISCONFIGURATION, OBSERVATION, SCC_ERROR, + POSTURE_VIOLATION, TOXIC_COMBINATION, SENSITIVE_DATA_RISK, CHOKEPOINT), severity, + state, category, resource name/type, and mute status. + Parameters: + project_id (required): The Google Cloud project ID (e.g., 'my-gcp-project'). + finding_class (optional): Filter by finding class. Valid values: VULNERABILITY, THREAT, + MISCONFIGURATION, OBSERVATION, SCC_ERROR, POSTURE_VIOLATION, TOXIC_COMBINATION, + SENSITIVE_DATA_RISK, CHOKEPOINT. Can combine with OR (e.g., 'THREAT OR MISCONFIGURATION'). + severity (optional): Filter by severity. Valid values: CRITICAL, HIGH, MEDIUM, LOW. + Can combine with OR (e.g., 'HIGH OR CRITICAL'). + state (optional): Filter by state. Valid values: ACTIVE, INACTIVE. Defaults to 'ACTIVE'. + Set to None or empty string to search all states. + category (optional): Filter by finding category (e.g., 'PUBLIC_BUCKET_ACL', 'XSS', 'SQL_INJECTION', + 'OPEN_FIREWALL', 'MFA_NOT_ENFORCED', etc.). + resource_name (optional): Filter by the full resource name associated with the finding. + resource_type (optional): Filter by resource type (e.g., 'google.compute.Instance', 'google.storage.Bucket'). + mute (optional): Filter by mute status. Valid values: MUTED, UNMUTED, UNDEFINED. + custom_filter (optional): A raw SCC filter string that will be appended to any other filters using AND. + Use this for advanced filtering not covered by other parameters. + max_findings (optional): Maximum number of findings to return. Defaults to 50. + location (optional): The Google Cloud location for SCC v2 (e.g., 'global', 'us-central1'). Defaults to 'global'. + order_by (optional): Ordering of results. Defaults to 'event_time desc'. Other options include + 'severity desc', 'create_time desc'. + """ + if not scc_client: + return {"error": "Security Center Client not initialized."} + + # Build filter string from parameters + filter_parts = [] + + if state: + filter_parts.append(f'state="{state}"') + + if finding_class: + # Support multiple classes with OR + if " OR " in finding_class.upper(): + classes = [c.strip() for c in finding_class.upper().split(" OR ")] + class_filter = " OR ".join([f'findingClass="{c}"' for c in classes]) + filter_parts.append(f"({class_filter})") + else: + filter_parts.append(f'findingClass="{finding_class.upper()}"') + + if severity: + # Support multiple severities with OR + if " OR " in severity.upper(): + sevs = [s.strip() for s in severity.upper().split(" OR ")] + sev_filter = " OR ".join([f'severity="{s}"' for s in sevs]) + filter_parts.append(f"({sev_filter})") + else: + filter_parts.append(f'severity="{severity.upper()}"') + + if category: + filter_parts.append(f'category="{category}"') + + if resource_name: + filter_parts.append(f'resourceName="{resource_name}"') + + if resource_type: + filter_parts.append(f'resource.type="{resource_type}"') + + if mute: + filter_parts.append(f'mute="{mute.upper()}"') + + if custom_filter: + filter_parts.append(custom_filter) + + filter_str = " AND ".join(filter_parts) if filter_parts else "" + + # SCC v2: parent must include /locations/{location} + parent = f"projects/{project_id}/sources/-/locations/{location}" + + logger.info(f"Searching findings for project: {project_id}, filter: {filter_str}") + + try: + request_args = { + "parent": parent, + "filter": filter_str, + "page_size": min(max_findings, 1000), + "order_by": order_by, + } + + response_pager = scc_client.list_findings(request=request_args) + + all_findings = [] + count = 0 + for page in response_pager.pages: + for item in page.list_findings_results: + if count >= max_findings: + break + finding_dict = proto_message_to_dict(item.finding) + all_findings.append(finding_dict) + count += 1 + if count >= max_findings: + break + + # Check if more findings exist + more_findings_exist = count >= max_findings + + return { + "findings": all_findings, + "count": len(all_findings), + "filter_applied": filter_str if filter_str else "No filter (all findings)", + "more_findings_may_exist": more_findings_exist, + } + + except google_exceptions.NotFound as e: + logger.error(f"Project or resource not found for search on {parent}: {e}") + return {"error": "Not Found", "details": f"Could not find project '{project_id}' or relevant resources. {str(e)}"} + except google_exceptions.PermissionDenied as e: + logger.error(f"Permission denied for search on {parent}: {e}") + return {"error": "Permission Denied", "details": str(e)} + except google_exceptions.InvalidArgument as e: + logger.error(f"Invalid argument (check filter syntax?) for search on {parent}: {e}") + return {"error": "Invalid Argument", "details": str(e)} + except Exception as e: + logger.error(f"An unexpected error occurred searching findings: {e}", exc_info=True) + return {"error": "An unexpected error occurred", "details": str(e)} + + +@mcp.tool() +async def get_finding_details( + project_id: str, + finding_id: str, + location: str = "global", + include_resource_details: bool = True, +) -> Dict[str, Any]: + """Name: get_finding_details + + Description: Gets the full details of a specific finding by its finding ID, including description, + remediation steps, severity, attack exposure, compliance information, MITRE ATT&CK data, + vulnerability details, and optionally the affected resource details from Cloud Asset + Inventory (CAI). Works for any finding class (VULNERABILITY, THREAT, MISCONFIGURATION, etc.). + Parameters: + project_id (required): The Google Cloud project ID (e.g., 'my-gcp-project'). + finding_id (required): The ID of the finding to retrieve. + location (optional): The Google Cloud location for SCC v2 (e.g., 'global', 'us-central1'). Defaults to 'global'. + include_resource_details (optional): Whether to fetch additional resource details from Cloud Asset + Inventory. Defaults to True. + """ + if not scc_client: + return {"error": "Security Center Client not initialized."} + + parent = f"projects/{project_id}/sources/-/locations/{location}" + finding_name_filter = f"projects/{project_id}/sources/-/locations/{location}/findings/{finding_id}" + filter_str = f'name="{finding_name_filter}"' + + logger.info(f"Getting details for finding {finding_id} in project {project_id}") + + try: + scc_request_args = { + "parent": parent, + "filter": filter_str, + "page_size": 1, + } + + response_pager = scc_client.list_findings(request=scc_request_args) + first_finding = None + page = next(iter(response_pager.pages), None) + if page and page.list_findings_results: + results = list(page.list_findings_results) + if results: + first_finding = results[0].finding + + if not first_finding: + return {"error": "Finding not found", "details": f"No finding with ID '{finding_id}' found in project '{project_id}'."} + + finding_dict = proto_message_to_dict(first_finding) + + result = { + "finding": finding_dict, + "finding_id": finding_id, + } + + # Optionally fetch resource details from CAI + resource_name_from_finding = finding_dict.get("resourceName") + if include_resource_details and resource_name_from_finding and cai_client: + try: + cai_scope = f"projects/{project_id}" + cai_request = asset_v1.SearchAllResourcesRequest( + scope=cai_scope, + query=f'name="{resource_name_from_finding}"', + page_size=1, + ) + cai_response = cai_client.search_all_resources(request=cai_request) + asset_result = next(iter(cai_response), None) + if asset_result: + result["resource_details_cai"] = proto_message_to_dict(asset_result) + logger.info(f"Successfully fetched CAI details for {resource_name_from_finding}") + else: + result["resource_details_cai"] = {"warning": "Resource not found in CAI.", "resource_name": resource_name_from_finding} + except Exception as cai_e: + logger.error(f"Error fetching CAI details: {cai_e}") + result["resource_details_cai"] = {"error": "Failed to fetch resource details from CAI.", "details": str(cai_e)} + elif include_resource_details and not cai_client: + result["resource_details_cai"] = {"error": "Cloud Asset Inventory Client not initialized."} + + return result + + except google_exceptions.NotFound as e: + logger.error(f"Resource not found: {e}") + return {"error": "Not Found", "details": str(e)} + except google_exceptions.PermissionDenied as e: + logger.error(f"Permission denied: {e}") + return {"error": "Permission Denied", "details": str(e)} + except google_exceptions.InvalidArgument as e: + logger.error(f"Invalid argument: {e}") + return {"error": "Invalid Argument", "details": str(e)} + except Exception as e: + logger.error(f"Unexpected error getting finding details: {e}", exc_info=True) + return {"error": "An unexpected error occurred", "details": str(e)} + + +@mcp.tool() +async def search_findings_by_compliance( + project_id: str, + search_text: str = None, + compliance_standard: str = None, + compliance_version: str = None, + compliance_id: str = None, + severity: str = None, + state: str = "ACTIVE", + max_findings: int = 50, + location: str = "global", +) -> Dict[str, Any]: + """Name: search_findings_by_compliance + + Description: Searches Security Command Center findings by compliance framework information (e.g., CIS benchmarks, + PCI DSS, NIST 800-53, ISO 27001) or by free-text search on finding descriptions and categories. + Use this tool when you have a compliance control name or description (e.g., + 'ServiceAccount should not have Admin privileges', 'Ensure log metric filter and alerts exist') + and want to find the corresponding SCC findings. Also supports filtering by specific compliance + standard name, version, and control ID. + Parameters: + project_id (required): The Google Cloud project ID (e.g., 'my-gcp-project'). + search_text (optional): Free-text to search across finding descriptions, categories, and compliance standard names. + Matches if any of these fields contain the search text (case-insensitive). Examples: + 'ServiceAccount should not have Admin privileges', 'log metric filter', 'MFA', 'firewall'. + compliance_standard (optional): Filter by compliance standard name (case-insensitive partial match). + Examples: 'CIS', 'CIS Google Cloud Platform', 'PCI DSS', 'NIST 800-53', 'ISO 27001'. + compliance_version (optional): Filter by compliance standard version (exact match). Examples: '1.3.0', '2.0'. + compliance_id (optional): Filter by compliance control ID (exact match). Examples: '1.5', '4.1', '6.2.1'. + severity (optional): Pre-filter by severity at the API level. Valid values: CRITICAL, HIGH, MEDIUM, LOW. + Supports OR (e.g., 'HIGH OR CRITICAL'). + state (optional): Filter by state. Valid values: ACTIVE, INACTIVE. Defaults to 'ACTIVE'. + Set to None or empty string to search all states. + max_findings (optional): Maximum number of matching findings to return. Defaults to 50. + location (optional): The Google Cloud location for SCC v2. Defaults to 'global'. + """ + if not scc_client: + return {"error": "Security Center Client not initialized."} + + if not search_text and not compliance_standard and not compliance_id: + return { + "error": "Missing parameters", + "details": "At least one of search_text, compliance_standard, or compliance_id must be provided.", + } + + # Build API-level filter (for fields the API supports filtering on) + filter_parts = [] + + if state: + filter_parts.append(f'state="{state}"') + + if severity: + if " OR " in severity.upper(): + sevs = [s.strip() for s in severity.upper().split(" OR ")] + sev_filter = " OR ".join([f'severity="{s}"' for s in sevs]) + filter_parts.append(f"({sev_filter})") + else: + filter_parts.append(f'severity="{severity.upper()}"') + + filter_str = " AND ".join(filter_parts) if filter_parts else "" + + parent = f"projects/{project_id}/sources/-/locations/{location}" + + logger.info( + f"Searching findings by compliance for project: {project_id}, " + f"search_text='{search_text}', standard='{compliance_standard}', " + f"version='{compliance_version}', id='{compliance_id}'" + ) + + try: + # Fetch findings from the API (with API-level filters only) + # We fetch more than max_findings because client-side filtering will reduce the count + fetch_limit = max_findings * 10 # Overfetch to account for client-side filtering + request_args = { + "parent": parent, + "filter": filter_str, + "page_size": min(fetch_limit, 1000), + } + + response_pager = scc_client.list_findings(request=request_args) + + matched_findings = [] + scanned_count = 0 + search_text_lower = search_text.lower() if search_text else None + + for page in response_pager.pages: + for item in page.list_findings_results: + if len(matched_findings) >= max_findings: + break + if scanned_count >= fetch_limit: + break + + scanned_count += 1 + finding_dict = proto_message_to_dict(item.finding) + + # --- Client-side compliance filtering --- + compliances = finding_dict.get("compliances", []) + description = finding_dict.get("description", "") + category = finding_dict.get("category", "") + + # Check compliance_standard filter + if compliance_standard: + standard_lower = compliance_standard.lower() + has_standard = any( + standard_lower in c.get("standard", "").lower() + for c in compliances + ) + if not has_standard: + continue + + # Check compliance_version filter + if compliance_version: + has_version = any( + c.get("version") == compliance_version + for c in compliances + ) + if not has_version: + continue + + # Check compliance_id filter + if compliance_id: + has_id = any( + compliance_id in c.get("ids", []) + for c in compliances + ) + if not has_id: + continue + + # Check search_text filter (matches description, category, or compliance standard names) + if search_text_lower: + text_match = ( + search_text_lower in description.lower() + or search_text_lower in category.lower() + or any( + search_text_lower in c.get("standard", "").lower() + for c in compliances + ) + ) + if not text_match: + continue + + # Finding passed all filters — enrich with compliance summary + compliance_summary = [] + for c in compliances: + comp_entry = { + "standard": c.get("standard"), + "version": c.get("version"), + "ids": c.get("ids", []), + } + compliance_summary.append(comp_entry) + + finding_dict["compliance_summary"] = compliance_summary + matched_findings.append(finding_dict) + + if len(matched_findings) >= max_findings or scanned_count >= fetch_limit: + break + + return { + "findings": matched_findings, + "count": len(matched_findings), + "scanned_count": scanned_count, + "filters_applied": { + "search_text": search_text, + "compliance_standard": compliance_standard, + "compliance_version": compliance_version, + "compliance_id": compliance_id, + "severity": severity, + "state": state, + }, + "more_findings_may_exist": scanned_count >= fetch_limit and len(matched_findings) >= max_findings, + } + + except google_exceptions.NotFound as e: + logger.error(f"Project not found for compliance search on {parent}: {e}") + return {"error": "Not Found", "details": f"Could not find project '{project_id}'. {str(e)}"} + except google_exceptions.PermissionDenied as e: + logger.error(f"Permission denied for compliance search on {parent}: {e}") + return {"error": "Permission Denied", "details": str(e)} + except google_exceptions.InvalidArgument as e: + logger.error(f"Invalid argument for compliance search on {parent}: {e}") + return {"error": "Invalid Argument", "details": str(e)} + except Exception as e: + logger.error(f"Unexpected error in compliance search: {e}", exc_info=True) + return {"error": "An unexpected error occurred", "details": str(e)} + + @mcp.tool() async def top_vulnerability_findings( project_id: str, max_findings: int = 20, + location: str = "global", ) -> Dict[str, Any]: """Name: top_vulnerability_findings @@ -83,11 +502,13 @@ async def top_vulnerability_findings( Parameters: project_id (required): The Google Cloud project ID (e.g., 'my-gcp-project'). max_findings (optional): The maximum number of findings to return. Defaults to 20. + location (optional): The Google Cloud location for SCC v2 (e.g., 'global', 'us-central1'). Defaults to 'global'. """ if not scc_client: return {"error": "Security Center Client not initialized."} - parent = f"projects/{project_id}/sources/-" # Search across all sources in the project + # SCC v2: parent must include /locations/{location} + parent = f"projects/{project_id}/sources/-/locations/{location}" # Filter for active, high/critical vulnerability findings filter_str = 'state="ACTIVE" AND findingClass="VULNERABILITY" AND (severity="HIGH" OR severity="CRITICAL")' @@ -112,8 +533,8 @@ async def top_vulnerability_findings( if page: for item in page.list_findings_results: finding_dict = proto_message_to_dict(item.finding) - # Extract scores, handling potential absence - attack_exposure_score = finding_dict.get("attackExposureScore") # Directly under finding + # SCC v2: attackExposure is a nested message { score: float, ... } + attack_exposure_score = finding_dict.get("attackExposure", {}).get("score") finding_summary = { "name": finding_dict.get("name"), @@ -172,7 +593,8 @@ async def get_finding_remediation( project_id: str, resource_name: str = None, category: str = None, - finding_id: str = None + finding_id: str = None, + location: str = "global", ) -> Dict[str, Any]: """Name: get_finding_remediation @@ -186,6 +608,7 @@ async def get_finding_remediation( (e.g., '//container.googleapis.com/projects/my-project/locations/us-central1/clusters/my-cluster') category (optional): The category of the finding (e.g., 'GKE_SECURITY_BULLETIN'). finding_id (optional): The ID of the finding to search for directly. + location (optional): The Google Cloud location for SCC v2 (e.g., 'global', 'us-central1'). Defaults to 'global'. """ if not scc_client: return {"error": "Security Center Client not initialized."} @@ -199,13 +622,15 @@ async def get_finding_remediation( first_finding_result = None scc_error = None - parent = f"projects/{project_id}/sources/-" # Define parent once + # SCC v2: parent must include /locations/{location} + parent = f"projects/{project_id}/sources/-/locations/{location}" filter_str = "" # Initialize filter string try: if finding_id: - # --- Use list_findings with name filter for finding_id (V1 Client) --- - finding_name_to_filter = f"projects/{project_id}/sources/-/findings/{finding_id}" + # --- Use list_findings with name filter for finding_id (SCC v2 Client) --- + # v2 finding names include /locations/{location}/ + finding_name_to_filter = f"projects/{project_id}/sources/-/locations/{location}/findings/{finding_id}" filter_str = f'name="{finding_name_to_filter}"' logger.info(f"Attempting to list findings by name filter: {filter_str}") scc_request_args = { From e4cd6508cba6920f39828712a0888e2c84a8c134 Mon Sep 17 00:00:00 2001 From: lucca Date: Wed, 3 Jun 2026 16:06:39 -0300 Subject: [PATCH 2/2] feat(scc): fix sorting, add mute tool, extract filter helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - top_vulnerability_findings: fetch up to min(max*10, 1000) candidates before sorting by attack exposure — previously only sorted the first page (20 findings), making "top by score" meaningless on large projects - top_vulnerability_findings: include nextSteps in output for immediate actionability without a follow-up get_finding_remediation call - add set_finding_mute tool: mute/unmute findings via SCC v2 set_mute API - extract _build_parent() and _build_or_filter() helpers — removed 5x duplicated parent path construction and OR-filter building logic - fix more_findings_may_exist in search_findings to use pager token instead of a count heuristic - update README: document set_finding_mute and IAM role distinction --- server/scc/README.md | 15 ++- server/scc/scc_mcp.py | 232 ++++++++++++++++++++++++------------------ 2 files changed, 146 insertions(+), 101 deletions(-) diff --git a/server/scc/README.md b/server/scc/README.md index d684c61e..da8264cf 100644 --- a/server/scc/README.md +++ b/server/scc/README.md @@ -43,11 +43,20 @@ This is an MCP (Model Context Protocol) server for interacting with Google Cloud - `max_findings` (optional): Maximum findings to return. Defaults to 50. - `location` (optional): Google Cloud location for SCC v2. Defaults to `global`. -- **`top_vulnerability_findings(project_id, max_findings=20)`** - - **Description**: Lists the top ACTIVE, HIGH or CRITICAL severity findings of class VULNERABILITY for a specific project, sorted by Attack Exposure Score (descending). Includes the Attack Exposure score in the output if available. Aids prioritization for remediation. +- **`set_finding_mute(project_id, finding_id, mute, location="global")`** + - **Description**: Mutes or unmutes a specific SCC finding. Muted findings are hidden from default views but retained for compliance. Use to suppress accepted risks or resurface previously muted findings. + - **Parameters**: + - `project_id` (required): The Google Cloud project ID. + - `finding_id` (required): The ID of the finding to mute or unmute. + - `mute` (required): The mute state to set: `MUTED` or `UNMUTED`. + - `location` (optional): Google Cloud location for SCC v2. Defaults to `global`. + +- **`top_vulnerability_findings(project_id, max_findings=20, location="global")`** + - **Description**: Lists the top ACTIVE, HIGH or CRITICAL severity findings of class VULNERABILITY for a specific project, sorted by Attack Exposure Score (descending). Fetches up to 10× `max_findings` candidates (capped at 1000) to ensure the sort is meaningful across the full population. Includes Attack Exposure score and remediation steps (`nextSteps`). - **Parameters**: - `project_id` (required): The Google Cloud project ID (e.g., 'my-gcp-project'). - `max_findings` (optional): The maximum number of findings to return. Defaults to 20. + - `location` (optional): Google Cloud location for SCC v2. Defaults to `global`. - **`get_finding_remediation(project_id, resource_name=None, category=None, finding_id=None)`** - **Description**: Gets the remediation steps (`nextSteps`) for a specific finding within a project, along with details of the affected resource fetched from Cloud Asset Inventory (CAI). The finding can be identified either by its `resource_name` and `category` (for ACTIVE findings) or directly by its `finding_id` (regardless of state). @@ -95,7 +104,7 @@ The server uses Google Cloud's authentication mechanisms. Ensure you have one of ### Required IAM Permissions Appropriate IAM permissions are required on the target Google Cloud project(s): -- Security Command Center: `roles/securitycenter.adminViewer` or `roles/securitycenter.adminEditor` +- Security Command Center: `roles/securitycenter.adminViewer` (read-only tools) or `roles/securitycenter.adminEditor` (required for `set_finding_mute`) - Cloud Asset Inventory: `roles/cloudasset.viewer` ## License diff --git a/server/scc/scc_mcp.py b/server/scc/scc_mcp.py index efc2d7b3..d7f493e4 100644 --- a/server/scc/scc_mcp.py +++ b/server/scc/scc_mcp.py @@ -64,10 +64,21 @@ def proto_message_to_dict(message: Any) -> Dict[str, Any]: return json_format.MessageToDict(message._pb) except Exception as e: logger.error(f"Error converting protobuf message to dict: {e}") - # Fallback or re-raise depending on desired error handling return {"error": "Failed to serialize response part", "details": str(e)} +def _build_parent(project_id: str, location: str = "global") -> str: + return f"projects/{project_id}/sources/-/locations/{location}" + + +def _build_or_filter(field: str, value: str) -> str: + """Build a SCC filter clause supporting OR-separated values.""" + if " OR " in value.upper(): + parts = [p.strip().upper() for p in value.split(" OR ")] + return "(" + " OR ".join(f'{field}="{p}"' for p in parts) + ")" + return f'{field}="{value.upper()}"' + + # --- Security Command Center Tools --- @mcp.tool() @@ -117,49 +128,26 @@ async def search_findings( if not scc_client: return {"error": "Security Center Client not initialized."} - # Build filter string from parameters filter_parts = [] - if state: filter_parts.append(f'state="{state}"') - if finding_class: - # Support multiple classes with OR - if " OR " in finding_class.upper(): - classes = [c.strip() for c in finding_class.upper().split(" OR ")] - class_filter = " OR ".join([f'findingClass="{c}"' for c in classes]) - filter_parts.append(f"({class_filter})") - else: - filter_parts.append(f'findingClass="{finding_class.upper()}"') - + filter_parts.append(_build_or_filter("findingClass", finding_class)) if severity: - # Support multiple severities with OR - if " OR " in severity.upper(): - sevs = [s.strip() for s in severity.upper().split(" OR ")] - sev_filter = " OR ".join([f'severity="{s}"' for s in sevs]) - filter_parts.append(f"({sev_filter})") - else: - filter_parts.append(f'severity="{severity.upper()}"') - + filter_parts.append(_build_or_filter("severity", severity)) if category: filter_parts.append(f'category="{category}"') - if resource_name: filter_parts.append(f'resourceName="{resource_name}"') - if resource_type: filter_parts.append(f'resource.type="{resource_type}"') - if mute: filter_parts.append(f'mute="{mute.upper()}"') - if custom_filter: filter_parts.append(custom_filter) filter_str = " AND ".join(filter_parts) if filter_parts else "" - - # SCC v2: parent must include /locations/{location} - parent = f"projects/{project_id}/sources/-/locations/{location}" + parent = _build_parent(project_id, location) logger.info(f"Searching findings for project: {project_id}, filter: {filter_str}") @@ -174,25 +162,25 @@ async def search_findings( response_pager = scc_client.list_findings(request=request_args) all_findings = [] - count = 0 + last_page = None for page in response_pager.pages: + last_page = page for item in page.list_findings_results: - if count >= max_findings: + if len(all_findings) >= max_findings: break - finding_dict = proto_message_to_dict(item.finding) - all_findings.append(finding_dict) - count += 1 - if count >= max_findings: + all_findings.append(proto_message_to_dict(item.finding)) + if len(all_findings) >= max_findings: break - # Check if more findings exist - more_findings_exist = count >= max_findings + more_findings_may_exist = bool( + last_page and getattr(last_page, "next_page_token", None) + ) or len(all_findings) >= max_findings return { "findings": all_findings, "count": len(all_findings), "filter_applied": filter_str if filter_str else "No filter (all findings)", - "more_findings_may_exist": more_findings_exist, + "more_findings_may_exist": more_findings_may_exist, } except google_exceptions.NotFound as e: @@ -232,7 +220,7 @@ async def get_finding_details( if not scc_client: return {"error": "Security Center Client not initialized."} - parent = f"projects/{project_id}/sources/-/locations/{location}" + parent = _build_parent(project_id, location) finding_name_filter = f"projects/{project_id}/sources/-/locations/{location}/findings/{finding_id}" filter_str = f'name="{finding_name_filter}"' @@ -347,23 +335,14 @@ async def search_findings_by_compliance( "details": "At least one of search_text, compliance_standard, or compliance_id must be provided.", } - # Build API-level filter (for fields the API supports filtering on) filter_parts = [] - if state: filter_parts.append(f'state="{state}"') - if severity: - if " OR " in severity.upper(): - sevs = [s.strip() for s in severity.upper().split(" OR ")] - sev_filter = " OR ".join([f'severity="{s}"' for s in sevs]) - filter_parts.append(f"({sev_filter})") - else: - filter_parts.append(f'severity="{severity.upper()}"') + filter_parts.append(_build_or_filter("severity", severity)) filter_str = " AND ".join(filter_parts) if filter_parts else "" - - parent = f"projects/{project_id}/sources/-/locations/{location}" + parent = _build_parent(project_id, location) logger.info( f"Searching findings by compliance for project: {project_id}, " @@ -507,71 +486,54 @@ async def top_vulnerability_findings( if not scc_client: return {"error": "Security Center Client not initialized."} - # SCC v2: parent must include /locations/{location} - parent = f"projects/{project_id}/sources/-/locations/{location}" - # Filter for active, high/critical vulnerability findings + parent = _build_parent(project_id, location) filter_str = 'state="ACTIVE" AND findingClass="VULNERABILITY" AND (severity="HIGH" OR severity="CRITICAL")' - # Define a larger page size to fetch enough findings for sorting - fetch_page_size = 20 # Fetch up to 20 findings initially + # Fetch significantly more than max_findings so sort-by-attack-exposure is meaningful + # across the full population, not just the first page. + final_max = max(max_findings if max_findings and max_findings > 0 else 20, 1) + fetch_size = min(final_max * 10, 1000) - logger.info(f"Getting top vulnerability findings for project: {project_id}") - logger.debug(f"Using parent: {parent}, filter: {filter_str}, fetching up to {fetch_page_size} findings for sorting.") + logger.info(f"Getting top vulnerability findings for project: {project_id}, fetching up to {fetch_size} for sorting") try: - request_args = { + response_pager = scc_client.list_findings(request={ "parent": parent, "filter": filter_str, - "page_size": fetch_page_size, # Use the larger fetch size here - } + "page_size": min(fetch_size, 1000), + }) - response_pager = scc_client.list_findings(request=request_args) - - all_fetched_findings = [] - # Iterate through the first page (up to fetch_page_size) - page = next(iter(response_pager.pages), None) - if page: + all_fetched = [] + for page in response_pager.pages: for item in page.list_findings_results: + if len(all_fetched) >= fetch_size: + break finding_dict = proto_message_to_dict(item.finding) - # SCC v2: attackExposure is a nested message { score: float, ... } - attack_exposure_score = finding_dict.get("attackExposure", {}).get("score") - - finding_summary = { + all_fetched.append({ "name": finding_dict.get("name"), "category": finding_dict.get("category"), "resourceName": finding_dict.get("resourceName"), "severity": finding_dict.get("severity"), "description": finding_dict.get("description", "No description provided."), - "attackExposureScore": attack_exposure_score, # Include score - } - all_fetched_findings.append(finding_summary) - - # Sort the findings: Attack Exposure Score (desc) - # Treat None scores as lowest (-1.0) - def sort_key(f): - # Prioritize Attack Exposure Score (higher is worse) - # Treat None as -1.0 for sorting purposes to put them last in descending order - aes = f.get("attackExposureScore", -1.0) - # Handle potential non-numeric values if they can occur - aes = float(aes) if aes is not None else -1.0 - # Sort ONLY by Attack Exposure Score - return aes # Return only AES for sorting - - all_fetched_findings.sort(key=sort_key, reverse=True) # Sort descending by AES - - # Limit results to max_findings - # Ensure max_findings is not None and is positive before slicing - final_max_findings = max_findings if max_findings is not None and max_findings > 0 else 20 - sorted_findings = all_fetched_findings[:final_max_findings] - - # Determine if more findings *might* exist beyond the initial fetch_page_size - more_findings_exist = bool(response_pager.next_page_token) or len(all_fetched_findings) == fetch_page_size + "nextSteps": finding_dict.get("nextSteps", ""), + "attackExposureScore": ( + finding_dict.get("attackExposure") or {} + ).get("score"), + }) + if len(all_fetched) >= fetch_size: + break + + all_fetched.sort( + key=lambda f: float(f["attackExposureScore"]) if f["attackExposureScore"] is not None else -1.0, + reverse=True, + ) + sorted_findings = all_fetched[:final_max] return { - "top_findings": sorted_findings, # Return the sorted and limited list + "top_findings": sorted_findings, "count": len(sorted_findings), - # Indicate if there might be more findings beyond the *initial fetch* limit - "more_findings_exist_beyond_fetch_limit": more_findings_exist + "fetched_for_sorting": len(all_fetched), + "more_findings_exist_beyond_fetch_limit": len(all_fetched) >= fetch_size, } except google_exceptions.NotFound as e: @@ -622,9 +584,8 @@ async def get_finding_remediation( first_finding_result = None scc_error = None - # SCC v2: parent must include /locations/{location} - parent = f"projects/{project_id}/sources/-/locations/{location}" - filter_str = "" # Initialize filter string + parent = _build_parent(project_id, location) + filter_str = "" try: if finding_id: @@ -739,6 +700,81 @@ async def get_finding_remediation( logger.error(f"An unexpected error occurred in get_finding_remediation: {e}", exc_info=True) return {"error": "An unexpected error occurred", "details": str(e)} +@mcp.tool() +async def set_finding_mute( + project_id: str, + finding_id: str, + mute: str, + location: str = "global", +) -> Dict[str, Any]: + """Name: set_finding_mute + + Description: Mutes or unmutes a specific Security Command Center finding. Muted findings are + hidden from default views but remain in the system for compliance purposes. Use + MUTED to suppress known/accepted risks and UNMUTED to resurface them. + Parameters: + project_id (required): The Google Cloud project ID (e.g., 'my-gcp-project'). + finding_id (required): The ID of the finding to mute or unmute. + mute (required): The mute state to set. Valid values: MUTED, UNMUTED. + location (optional): The Google Cloud location for SCC v2. Defaults to 'global'. + """ + if not scc_client: + return {"error": "Security Center Client not initialized."} + + mute_upper = mute.upper() + if mute_upper not in ("MUTED", "UNMUTED"): + return {"error": "Invalid mute value", "details": "Must be 'MUTED' or 'UNMUTED'."} + + # set_mute requires the canonical finding name (with actual source ID, not wildcard). + # Look up the finding first via list_findings to resolve the full name. + parent = _build_parent(project_id, location) + filter_str = f'name="projects/{project_id}/sources/-/locations/{location}/findings/{finding_id}"' + + logger.info(f"Resolving finding {finding_id} for mute operation in project {project_id}") + + try: + response_pager = scc_client.list_findings(request={ + "parent": parent, + "filter": filter_str, + "page_size": 1, + }) + page = next(iter(response_pager.pages), None) + finding = None + if page and page.list_findings_results: + finding = list(page.list_findings_results)[0].finding + + if not finding: + return {"error": "Finding not found", "details": f"No finding with ID '{finding_id}' in project '{project_id}'."} + + full_name = finding.name + mute_enum = securitycenter_v2.Finding.Mute[mute_upper] + + request = securitycenter_v2.SetMuteFindingRequest( + name=full_name, + mute=mute_enum, + ) + updated = scc_client.set_mute(request=request) + + return { + "success": True, + "finding_name": full_name, + "mute_state": mute_upper, + "finding": proto_message_to_dict(updated), + } + + except google_exceptions.NotFound as e: + logger.error(f"Finding not found: {e}") + return {"error": "Not Found", "details": str(e)} + except google_exceptions.PermissionDenied as e: + logger.error(f"Permission denied setting mute: {e}") + return {"error": "Permission Denied", "details": str(e)} + except KeyError: + return {"error": "Invalid mute value", "details": f"'{mute}' is not a valid Mute enum value. Use MUTED or UNMUTED."} + except Exception as e: + logger.error(f"Unexpected error setting mute on {finding_id}: {e}", exc_info=True) + return {"error": "An unexpected error occurred", "details": str(e)} + + # --- Main execution --- def main() -> None: