diff --git a/lib/crewai-tools/src/crewai_tools/security/__init__.py b/lib/crewai-tools/src/crewai_tools/security/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/lib/crewai-tools/src/crewai_tools/security/safe_path.py b/lib/crewai-tools/src/crewai_tools/security/safe_path.py new file mode 100644 index 0000000000..4dde68e128 --- /dev/null +++ b/lib/crewai-tools/src/crewai_tools/security/safe_path.py @@ -0,0 +1,205 @@ +"""Path and URL validation utilities for crewai-tools. + +Provides validation for file paths and URLs to prevent unauthorized +file access and server-side request forgery (SSRF) when tools accept +user-controlled or LLM-controlled inputs at runtime. + +Set CREWAI_TOOLS_ALLOW_UNSAFE_PATHS=true to bypass validation (not +recommended for production). +""" + +from __future__ import annotations + +import ipaddress +import logging +import os +import socket +from urllib.parse import urlparse + + +logger = logging.getLogger(__name__) + +_UNSAFE_PATHS_ENV = "CREWAI_TOOLS_ALLOW_UNSAFE_PATHS" + + +def _is_escape_hatch_enabled() -> bool: + """Check if the unsafe paths escape hatch is enabled.""" + return os.environ.get(_UNSAFE_PATHS_ENV, "").lower() in ("true", "1", "yes") + + +# --------------------------------------------------------------------------- +# File path validation +# --------------------------------------------------------------------------- + + +def validate_file_path(path: str, base_dir: str | None = None) -> str: + """Validate that a file path is safe to read. + + Resolves symlinks and ``..`` components, then checks that the resolved + path falls within *base_dir* (defaults to the current working directory). + + Args: + path: The file path to validate. + base_dir: Allowed root directory. Defaults to ``os.getcwd()``. + + Returns: + The resolved, validated absolute path. + + Raises: + ValueError: If the path escapes the allowed directory. + """ + if _is_escape_hatch_enabled(): + logger.warning( + "%s is enabled — skipping file path validation for: %s", + _UNSAFE_PATHS_ENV, + path, + ) + return os.path.realpath(path) + + if base_dir is None: + base_dir = os.getcwd() + + resolved_base = os.path.realpath(base_dir) + resolved_path = os.path.realpath( + os.path.join(resolved_base, path) if not os.path.isabs(path) else path + ) + + # Ensure the resolved path is within the base directory. + # When resolved_base already ends with a separator (e.g. the filesystem + # root "/"), appending os.sep would double it ("//"), so use the base + # as-is in that case. + prefix = resolved_base if resolved_base.endswith(os.sep) else resolved_base + os.sep + if not resolved_path.startswith(prefix) and resolved_path != resolved_base: + raise ValueError( + f"Path '{path}' resolves to '{resolved_path}' which is outside " + f"the allowed directory '{resolved_base}'. " + f"Set {_UNSAFE_PATHS_ENV}=true to bypass this check." + ) + + return resolved_path + + +def validate_directory_path(path: str, base_dir: str | None = None) -> str: + """Validate that a directory path is safe to read. + + Same as :func:`validate_file_path` but also checks that the path + is an existing directory. + + Args: + path: The directory path to validate. + base_dir: Allowed root directory. Defaults to ``os.getcwd()``. + + Returns: + The resolved, validated absolute path. + + Raises: + ValueError: If the path escapes the allowed directory or is not a directory. + """ + validated = validate_file_path(path, base_dir) + if not os.path.isdir(validated): + raise ValueError(f"Path '{validated}' is not a directory.") + return validated + + +# --------------------------------------------------------------------------- +# URL validation +# --------------------------------------------------------------------------- + +# Private and reserved IP ranges that should not be accessed +_BLOCKED_IPV4_NETWORKS = [ + ipaddress.ip_network("10.0.0.0/8"), + ipaddress.ip_network("172.16.0.0/12"), + ipaddress.ip_network("192.168.0.0/16"), + ipaddress.ip_network("127.0.0.0/8"), + ipaddress.ip_network("169.254.0.0/16"), # Link-local / cloud metadata + ipaddress.ip_network("0.0.0.0/32"), +] + +_BLOCKED_IPV6_NETWORKS = [ + ipaddress.ip_network("::1/128"), + ipaddress.ip_network("::/128"), + ipaddress.ip_network("fc00::/7"), # Unique local addresses + ipaddress.ip_network("fe80::/10"), # Link-local IPv6 +] + + +def _is_private_or_reserved(ip_str: str) -> bool: + """Check if an IP address is private, reserved, or otherwise unsafe.""" + try: + addr = ipaddress.ip_address(ip_str) + # Unwrap IPv4-mapped IPv6 addresses (e.g., ::ffff:127.0.0.1) to IPv4 + # so they are only checked against IPv4 networks (avoids TypeError when + # an IPv4Address is compared against an IPv6Network). + if isinstance(addr, ipaddress.IPv6Address) and addr.ipv4_mapped: + addr = addr.ipv4_mapped + networks = ( + _BLOCKED_IPV4_NETWORKS + if isinstance(addr, ipaddress.IPv4Address) + else _BLOCKED_IPV6_NETWORKS + ) + return any(addr in network for network in networks) + except ValueError: + return True # If we can't parse, block it + + +def validate_url(url: str) -> str: + """Validate that a URL is safe to fetch. + + Blocks ``file://`` scheme entirely. For ``http``/``https``, resolves + DNS and checks that the target IP is not private or reserved (prevents + SSRF to internal services and cloud metadata endpoints). + + Args: + url: The URL to validate. + + Returns: + The validated URL string. + + Raises: + ValueError: If the URL uses a blocked scheme or resolves to a + private/reserved IP address. + """ + if _is_escape_hatch_enabled(): + logger.warning( + "%s is enabled — skipping URL validation for: %s", + _UNSAFE_PATHS_ENV, + url, + ) + return url + + parsed = urlparse(url) + + # Block file:// scheme + if parsed.scheme == "file": + raise ValueError( + f"file:// URLs are not allowed: '{url}'. " + f"Use a file path instead, or set {_UNSAFE_PATHS_ENV}=true to bypass." + ) + + # Only allow http and https + if parsed.scheme not in ("http", "https"): + raise ValueError( + f"URL scheme '{parsed.scheme}' is not allowed. Only http and https are supported." + ) + + if not parsed.hostname: + raise ValueError(f"URL has no hostname: '{url}'") + + # Resolve DNS and check IPs + try: + addrinfos = socket.getaddrinfo( + parsed.hostname, parsed.port or (443 if parsed.scheme == "https" else 80) + ) + except socket.gaierror as exc: + raise ValueError(f"Could not resolve hostname: '{parsed.hostname}'") from exc + + for _family, _, _, _, sockaddr in addrinfos: + ip_str = str(sockaddr[0]) + if _is_private_or_reserved(ip_str): + raise ValueError( + f"URL '{url}' resolves to private/reserved IP {ip_str}. " + f"Access to internal networks is not allowed. " + f"Set {_UNSAFE_PATHS_ENV}=true to bypass." + ) + + return url diff --git a/lib/crewai-tools/src/crewai_tools/tools/brightdata_tool/brightdata_unlocker.py b/lib/crewai-tools/src/crewai_tools/tools/brightdata_tool/brightdata_unlocker.py index ee1716d0b5..c549b12201 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/brightdata_tool/brightdata_unlocker.py +++ b/lib/crewai-tools/src/crewai_tools/tools/brightdata_tool/brightdata_unlocker.py @@ -7,6 +7,8 @@ from pydantic import BaseModel, Field import requests +from crewai_tools.security.safe_path import validate_url + class BrightDataConfig(BaseModel): API_URL: str = "https://api.brightdata.com/request" @@ -134,6 +136,7 @@ def _run( "Content-Type": "application/json", } + validate_url(url) try: response = requests.post( self.base_url, json=payload, headers=headers, timeout=30 diff --git a/lib/crewai-tools/src/crewai_tools/tools/contextualai_create_agent_tool/contextual_create_agent_tool.py b/lib/crewai-tools/src/crewai_tools/tools/contextualai_create_agent_tool/contextual_create_agent_tool.py index 8896e82610..59bc0d4432 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/contextualai_create_agent_tool/contextual_create_agent_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/contextualai_create_agent_tool/contextual_create_agent_tool.py @@ -3,6 +3,8 @@ from crewai.tools import BaseTool from pydantic import BaseModel, Field +from crewai_tools.security.safe_path import validate_file_path + class ContextualAICreateAgentSchema(BaseModel): """Schema for contextual create agent tool.""" @@ -47,6 +49,7 @@ def _run( document_paths: list[str], ) -> str: """Create a complete RAG pipeline with documents.""" + resolved_paths = [validate_file_path(doc_path) for doc_path in document_paths] try: import os @@ -56,7 +59,7 @@ def _run( # Upload documents document_ids = [] - for doc_path in document_paths: + for doc_path in resolved_paths: if not os.path.exists(doc_path): raise FileNotFoundError(f"Document not found: {doc_path}") diff --git a/lib/crewai-tools/src/crewai_tools/tools/contextualai_parse_tool/contextual_parse_tool.py b/lib/crewai-tools/src/crewai_tools/tools/contextualai_parse_tool/contextual_parse_tool.py index 1a0317172e..99ef715144 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/contextualai_parse_tool/contextual_parse_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/contextualai_parse_tool/contextual_parse_tool.py @@ -1,6 +1,8 @@ from crewai.tools import BaseTool from pydantic import BaseModel, Field +from crewai_tools.security.safe_path import validate_file_path + class ContextualAIParseSchema(BaseModel): """Schema for contextual parse tool.""" @@ -45,6 +47,7 @@ def _run( """Parse a document using Contextual AI's parser.""" if output_types is None: output_types = ["markdown-per-page"] + file_path = validate_file_path(file_path) try: import json import os diff --git a/lib/crewai-tools/src/crewai_tools/tools/directory_read_tool/directory_read_tool.py b/lib/crewai-tools/src/crewai_tools/tools/directory_read_tool/directory_read_tool.py index f65b1b82dc..cd5b31bcca 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/directory_read_tool/directory_read_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/directory_read_tool/directory_read_tool.py @@ -4,6 +4,8 @@ from crewai.tools import BaseTool from pydantic import BaseModel, Field +from crewai_tools.security.safe_path import validate_directory_path + class FixedDirectoryReadToolSchema(BaseModel): """Input for DirectoryReadTool.""" @@ -39,6 +41,7 @@ def _run( if directory is None: raise ValueError("Directory must be provided.") + directory = validate_directory_path(directory) if directory[-1] == "/": directory = directory[:-1] files_list = [ diff --git a/lib/crewai-tools/src/crewai_tools/tools/directory_search_tool/directory_search_tool.py b/lib/crewai-tools/src/crewai_tools/tools/directory_search_tool/directory_search_tool.py index f17c4699af..3f6f278aee 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/directory_search_tool/directory_search_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/directory_search_tool/directory_search_tool.py @@ -3,8 +3,8 @@ from pydantic import BaseModel, Field from crewai_tools.rag.data_types import DataType +from crewai_tools.security.safe_path import validate_directory_path from crewai_tools.tools.rag.rag_tool import RagTool -from crewai_tools.utilities.safe_path import validate_directory_path class FixedDirectorySearchToolSchema(BaseModel): @@ -38,7 +38,7 @@ def __init__(self, directory: str | None = None, **kwargs: Any) -> None: self._generate_description() def add(self, directory: str) -> None: # type: ignore[override] - validate_directory_path(directory) + directory = validate_directory_path(directory) super().add(directory, data_type=DataType.DIRECTORY) def _run( # type: ignore[override] diff --git a/lib/crewai-tools/src/crewai_tools/tools/file_read_tool/file_read_tool.py b/lib/crewai-tools/src/crewai_tools/tools/file_read_tool/file_read_tool.py index 2c56a70cd6..428d19d7d5 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/file_read_tool/file_read_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/file_read_tool/file_read_tool.py @@ -3,6 +3,8 @@ from crewai.tools import BaseTool from pydantic import BaseModel, Field +from crewai_tools.security.safe_path import validate_file_path + class FileReadToolSchema(BaseModel): """Input for FileReadTool.""" @@ -76,6 +78,7 @@ def _run( if file_path is None: return "Error: No file path provided. Please provide a file path either in the constructor or as an argument." + file_path = validate_file_path(file_path) try: with open(file_path, "r") as file: if start_line == 1 and line_count is None: diff --git a/lib/crewai-tools/src/crewai_tools/tools/files_compressor_tool/files_compressor_tool.py b/lib/crewai-tools/src/crewai_tools/tools/files_compressor_tool/files_compressor_tool.py index 15861d9872..8a759263a6 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/files_compressor_tool/files_compressor_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/files_compressor_tool/files_compressor_tool.py @@ -5,6 +5,8 @@ from crewai.tools import BaseTool from pydantic import BaseModel, Field +from crewai_tools.security.safe_path import validate_file_path + class FileCompressorToolInput(BaseModel): """Input schema for FileCompressorTool.""" @@ -40,12 +42,15 @@ def _run( overwrite: bool = False, format: str = "zip", ) -> str: + input_path = validate_file_path(input_path) if not os.path.exists(input_path): return f"Input path '{input_path}' does not exist." if not output_path: output_path = self._generate_output_path(input_path, format) + output_path = validate_file_path(output_path) + format_extension = { "zip": ".zip", "tar": ".tar", diff --git a/lib/crewai-tools/src/crewai_tools/tools/firecrawl_crawl_website_tool/firecrawl_crawl_website_tool.py b/lib/crewai-tools/src/crewai_tools/tools/firecrawl_crawl_website_tool/firecrawl_crawl_website_tool.py index cce84c5220..47e98135ca 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/firecrawl_crawl_website_tool/firecrawl_crawl_website_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/firecrawl_crawl_website_tool/firecrawl_crawl_website_tool.py @@ -5,6 +5,8 @@ from crewai.tools import BaseTool, EnvVar from pydantic import BaseModel, ConfigDict, Field, PrivateAttr +from crewai_tools.security.safe_path import validate_url + try: from firecrawl import FirecrawlApp # type: ignore[import-untyped] @@ -106,6 +108,7 @@ def _run(self, url: str) -> Any: if not self._firecrawl: raise RuntimeError("FirecrawlApp not properly initialized") + url = validate_url(url) return self._firecrawl.crawl(url=url, poll_interval=2, **self.config) diff --git a/lib/crewai-tools/src/crewai_tools/tools/firecrawl_scrape_website_tool/firecrawl_scrape_website_tool.py b/lib/crewai-tools/src/crewai_tools/tools/firecrawl_scrape_website_tool/firecrawl_scrape_website_tool.py index 684cc96178..35b0029612 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/firecrawl_scrape_website_tool/firecrawl_scrape_website_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/firecrawl_scrape_website_tool/firecrawl_scrape_website_tool.py @@ -5,6 +5,8 @@ from crewai.tools import BaseTool, EnvVar from pydantic import BaseModel, ConfigDict, Field, PrivateAttr +from crewai_tools.security.safe_path import validate_url + try: from firecrawl import FirecrawlApp # type: ignore[import-untyped] @@ -106,6 +108,7 @@ def _run(self, url: str) -> Any: if not self._firecrawl: raise RuntimeError("FirecrawlApp not properly initialized") + url = validate_url(url) return self._firecrawl.scrape(url=url, **self.config) diff --git a/lib/crewai-tools/src/crewai_tools/tools/hyperbrowser_load_tool/hyperbrowser_load_tool.py b/lib/crewai-tools/src/crewai_tools/tools/hyperbrowser_load_tool/hyperbrowser_load_tool.py index 4cf52adab5..50a752d196 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/hyperbrowser_load_tool/hyperbrowser_load_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/hyperbrowser_load_tool/hyperbrowser_load_tool.py @@ -4,6 +4,8 @@ from crewai.tools import BaseTool, EnvVar from pydantic import BaseModel, Field +from crewai_tools.security.safe_path import validate_url + class HyperbrowserLoadToolSchema(BaseModel): url: str = Field(description="Website URL") @@ -119,6 +121,7 @@ def _run( ) from e params = self._prepare_params(params) + url = validate_url(url) if operation == "scrape": scrape_params = StartScrapeJobParams(url=url, **params) diff --git a/lib/crewai-tools/src/crewai_tools/tools/jina_scrape_website_tool/jina_scrape_website_tool.py b/lib/crewai-tools/src/crewai_tools/tools/jina_scrape_website_tool/jina_scrape_website_tool.py index 229df0f8ce..6762b60e8c 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/jina_scrape_website_tool/jina_scrape_website_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/jina_scrape_website_tool/jina_scrape_website_tool.py @@ -4,6 +4,8 @@ from pydantic import BaseModel, Field import requests +from crewai_tools.security.safe_path import validate_url + class JinaScrapeWebsiteToolInput(BaseModel): """Input schema for JinaScrapeWebsiteTool.""" @@ -45,6 +47,7 @@ def _run(self, website_url: str | None = None) -> str: "Website URL must be provided either during initialization or execution" ) + url = validate_url(url) response = requests.get( f"https://r.jina.ai/{url}", headers=self.headers, timeout=15 ) diff --git a/lib/crewai-tools/src/crewai_tools/tools/ocr_tool/ocr_tool.py b/lib/crewai-tools/src/crewai_tools/tools/ocr_tool/ocr_tool.py index 89ae45fb65..9a21062333 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/ocr_tool/ocr_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/ocr_tool/ocr_tool.py @@ -11,6 +11,8 @@ from crewai.utilities.types import LLMMessage from pydantic import BaseModel, Field +from crewai_tools.security.safe_path import validate_file_path + class OCRToolSchema(BaseModel): """Input schema for Optical Character Recognition Tool. @@ -98,5 +100,6 @@ def _encode_image(image_path: str) -> str: Returns: str: Base64-encoded image data as a UTF-8 string. """ + image_path = validate_file_path(image_path) with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode() diff --git a/lib/crewai-tools/src/crewai_tools/tools/rag/rag_tool.py b/lib/crewai-tools/src/crewai_tools/tools/rag/rag_tool.py index eb7e9cefd0..8099443e2d 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/rag/rag_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/rag/rag_tool.py @@ -251,7 +251,7 @@ def add( # unauthorized file reads and SSRF. from urllib.parse import urlparse - from crewai_tools.utilities.safe_path import validate_file_path, validate_url + from crewai_tools.security.safe_path import validate_file_path, validate_url def _check_url(value: str, label: str) -> None: try: @@ -259,9 +259,9 @@ def _check_url(value: str, label: str) -> None: except ValueError as e: raise ValueError(f"Blocked unsafe {label}: {e}") from e - def _check_path(value: str, label: str) -> None: + def _check_path(value: str, label: str) -> str: try: - validate_file_path(value) + return validate_file_path(value) except ValueError as e: raise ValueError(f"Blocked unsafe {label}: {e}") from e @@ -298,21 +298,32 @@ def _check_path(value: str, label: str) -> None: or os.path.isabs(source_ref) ): try: - validate_file_path(source_ref) + resolved_ref = validate_file_path(source_ref) except ValueError as e: raise ValueError(f"Blocked unsafe file path: {e}") from e + # Use the resolved path to prevent symlink TOCTOU + if isinstance(arg, dict): + arg = {**arg} + if "source" in arg: + arg["source"] = resolved_ref + elif "content" in arg: + arg["content"] = resolved_ref + else: + arg = resolved_ref validated_args.append(arg) # Validate keyword path/URL arguments — these are equally user-controlled # and must not bypass the checks applied to positional args. if "path" in kwargs and kwargs.get("path") is not None: - _check_path(str(kwargs["path"]), "path") + kwargs["path"] = _check_path(str(kwargs["path"]), "path") if "file_path" in kwargs and kwargs.get("file_path") is not None: - _check_path(str(kwargs["file_path"]), "file_path") + kwargs["file_path"] = _check_path(str(kwargs["file_path"]), "file_path") if "directory_path" in kwargs and kwargs.get("directory_path") is not None: - _check_path(str(kwargs["directory_path"]), "directory_path") + kwargs["directory_path"] = _check_path( + str(kwargs["directory_path"]), "directory_path" + ) if "url" in kwargs and kwargs.get("url") is not None: _check_url(str(kwargs["url"]), "url") diff --git a/lib/crewai-tools/src/crewai_tools/tools/scrape_element_from_website/scrape_element_from_website.py b/lib/crewai-tools/src/crewai_tools/tools/scrape_element_from_website/scrape_element_from_website.py index fc7b69a7c6..7bba12b72f 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/scrape_element_from_website/scrape_element_from_website.py +++ b/lib/crewai-tools/src/crewai_tools/tools/scrape_element_from_website/scrape_element_from_website.py @@ -5,6 +5,8 @@ from pydantic import BaseModel, Field import requests +from crewai_tools.security.safe_path import validate_url + try: from bs4 import BeautifulSoup @@ -81,6 +83,7 @@ def _run( if website_url is None or css_element is None: raise ValueError("Both website_url and css_element must be provided.") + website_url = validate_url(website_url) page = requests.get( website_url, headers=self.headers, diff --git a/lib/crewai-tools/src/crewai_tools/tools/scrape_website_tool/scrape_website_tool.py b/lib/crewai-tools/src/crewai_tools/tools/scrape_website_tool/scrape_website_tool.py index 375fcb6b4e..d297dfe086 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/scrape_website_tool/scrape_website_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/scrape_website_tool/scrape_website_tool.py @@ -5,6 +5,8 @@ from pydantic import Field import requests +from crewai_tools.security.safe_path import validate_url + try: from bs4 import BeautifulSoup @@ -73,6 +75,7 @@ def _run( if website_url is None: raise ValueError("Website URL must be provided.") + website_url = validate_url(website_url) page = requests.get( website_url, timeout=15, diff --git a/lib/crewai-tools/src/crewai_tools/tools/scrapfly_scrape_website_tool/scrapfly_scrape_website_tool.py b/lib/crewai-tools/src/crewai_tools/tools/scrapfly_scrape_website_tool/scrapfly_scrape_website_tool.py index 3c96d31afd..932b8dc7a3 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/scrapfly_scrape_website_tool/scrapfly_scrape_website_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/scrapfly_scrape_website_tool/scrapfly_scrape_website_tool.py @@ -5,6 +5,8 @@ from crewai.tools import BaseTool, EnvVar from pydantic import BaseModel, Field +from crewai_tools.security.safe_path import validate_url + logger = logging.getLogger(__file__) @@ -72,6 +74,7 @@ def _run( ) -> str | None: from scrapfly import ScrapeConfig + url = validate_url(url) scrape_config = scrape_config if scrape_config is not None else {} try: response = self.scrapfly.scrape( # type: ignore[union-attr] diff --git a/lib/crewai-tools/src/crewai_tools/tools/serper_scrape_website_tool/serper_scrape_website_tool.py b/lib/crewai-tools/src/crewai_tools/tools/serper_scrape_website_tool/serper_scrape_website_tool.py index e0e4080b4f..55521104ba 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/serper_scrape_website_tool/serper_scrape_website_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/serper_scrape_website_tool/serper_scrape_website_tool.py @@ -5,6 +5,8 @@ from pydantic import BaseModel, Field import requests +from crewai_tools.security.safe_path import validate_url + class SerperScrapeWebsiteInput(BaseModel): """Input schema for SerperScrapeWebsite.""" @@ -42,6 +44,7 @@ def _run(self, url: str, include_markdown: bool = True) -> str: Returns: Scraped website content as a string """ + validate_url(url) try: # Serper API endpoint api_url = "https://scrape.serper.dev" diff --git a/lib/crewai-tools/src/crewai_tools/tools/serply_api_tool/serply_webpage_to_markdown_tool.py b/lib/crewai-tools/src/crewai_tools/tools/serply_api_tool/serply_webpage_to_markdown_tool.py index f3a4729f20..4ace8b46a3 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/serply_api_tool/serply_webpage_to_markdown_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/serply_api_tool/serply_webpage_to_markdown_tool.py @@ -5,6 +5,7 @@ from pydantic import BaseModel, Field import requests +from crewai_tools.security.safe_path import validate_url from crewai_tools.tools.rag.rag_tool import RagTool @@ -48,6 +49,7 @@ def _run( # type: ignore[override] if self.proxy_location and not self.headers.get("X-Proxy-Location"): self.headers["X-Proxy-Location"] = self.proxy_location + validate_url(url) data = {"url": url, "method": "GET", "response_type": "markdown"} response = requests.request( "POST", diff --git a/lib/crewai-tools/src/crewai_tools/tools/vision_tool/vision_tool.py b/lib/crewai-tools/src/crewai_tools/tools/vision_tool/vision_tool.py index 1fa75c6883..24904c0f6b 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/vision_tool/vision_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/vision_tool/vision_tool.py @@ -7,6 +7,8 @@ from crewai.utilities.types import LLMMessage from pydantic import BaseModel, Field, PrivateAttr, field_validator +from crewai_tools.security.safe_path import validate_file_path + class ImagePromptSchema(BaseModel): """Input for Vision Tool.""" @@ -135,5 +137,6 @@ def _encode_image(image_path: str) -> str: Returns: Base64-encoded image data """ + image_path = validate_file_path(image_path) with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode() diff --git a/lib/crewai-tools/src/crewai_tools/tools/website_search/website_search_tool.py b/lib/crewai-tools/src/crewai_tools/tools/website_search/website_search_tool.py index 323557779e..62a6c1d702 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/website_search/website_search_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/website_search/website_search_tool.py @@ -3,6 +3,7 @@ from pydantic import BaseModel, Field from crewai_tools.rag.data_types import DataType +from crewai_tools.security.safe_path import validate_url from crewai_tools.tools.rag.rag_tool import RagTool @@ -37,6 +38,7 @@ def __init__(self, website: str | None = None, **kwargs: Any) -> None: self._generate_description() def add(self, website: str) -> None: # type: ignore[override] + website = validate_url(website) super().add(website, data_type=DataType.WEBSITE) def _run( # type: ignore[override] diff --git a/lib/crewai-tools/src/crewai_tools/utilities/safe_path.py b/lib/crewai-tools/src/crewai_tools/utilities/safe_path.py index 4dde68e128..f3ec120fde 100644 --- a/lib/crewai-tools/src/crewai_tools/utilities/safe_path.py +++ b/lib/crewai-tools/src/crewai_tools/utilities/safe_path.py @@ -1,205 +1,10 @@ -"""Path and URL validation utilities for crewai-tools. +"""Backward-compatible re-export from crewai_tools.security.safe_path.""" -Provides validation for file paths and URLs to prevent unauthorized -file access and server-side request forgery (SSRF) when tools accept -user-controlled or LLM-controlled inputs at runtime. +from crewai_tools.security.safe_path import ( + validate_directory_path, + validate_file_path, + validate_url, +) -Set CREWAI_TOOLS_ALLOW_UNSAFE_PATHS=true to bypass validation (not -recommended for production). -""" -from __future__ import annotations - -import ipaddress -import logging -import os -import socket -from urllib.parse import urlparse - - -logger = logging.getLogger(__name__) - -_UNSAFE_PATHS_ENV = "CREWAI_TOOLS_ALLOW_UNSAFE_PATHS" - - -def _is_escape_hatch_enabled() -> bool: - """Check if the unsafe paths escape hatch is enabled.""" - return os.environ.get(_UNSAFE_PATHS_ENV, "").lower() in ("true", "1", "yes") - - -# --------------------------------------------------------------------------- -# File path validation -# --------------------------------------------------------------------------- - - -def validate_file_path(path: str, base_dir: str | None = None) -> str: - """Validate that a file path is safe to read. - - Resolves symlinks and ``..`` components, then checks that the resolved - path falls within *base_dir* (defaults to the current working directory). - - Args: - path: The file path to validate. - base_dir: Allowed root directory. Defaults to ``os.getcwd()``. - - Returns: - The resolved, validated absolute path. - - Raises: - ValueError: If the path escapes the allowed directory. - """ - if _is_escape_hatch_enabled(): - logger.warning( - "%s is enabled — skipping file path validation for: %s", - _UNSAFE_PATHS_ENV, - path, - ) - return os.path.realpath(path) - - if base_dir is None: - base_dir = os.getcwd() - - resolved_base = os.path.realpath(base_dir) - resolved_path = os.path.realpath( - os.path.join(resolved_base, path) if not os.path.isabs(path) else path - ) - - # Ensure the resolved path is within the base directory. - # When resolved_base already ends with a separator (e.g. the filesystem - # root "/"), appending os.sep would double it ("//"), so use the base - # as-is in that case. - prefix = resolved_base if resolved_base.endswith(os.sep) else resolved_base + os.sep - if not resolved_path.startswith(prefix) and resolved_path != resolved_base: - raise ValueError( - f"Path '{path}' resolves to '{resolved_path}' which is outside " - f"the allowed directory '{resolved_base}'. " - f"Set {_UNSAFE_PATHS_ENV}=true to bypass this check." - ) - - return resolved_path - - -def validate_directory_path(path: str, base_dir: str | None = None) -> str: - """Validate that a directory path is safe to read. - - Same as :func:`validate_file_path` but also checks that the path - is an existing directory. - - Args: - path: The directory path to validate. - base_dir: Allowed root directory. Defaults to ``os.getcwd()``. - - Returns: - The resolved, validated absolute path. - - Raises: - ValueError: If the path escapes the allowed directory or is not a directory. - """ - validated = validate_file_path(path, base_dir) - if not os.path.isdir(validated): - raise ValueError(f"Path '{validated}' is not a directory.") - return validated - - -# --------------------------------------------------------------------------- -# URL validation -# --------------------------------------------------------------------------- - -# Private and reserved IP ranges that should not be accessed -_BLOCKED_IPV4_NETWORKS = [ - ipaddress.ip_network("10.0.0.0/8"), - ipaddress.ip_network("172.16.0.0/12"), - ipaddress.ip_network("192.168.0.0/16"), - ipaddress.ip_network("127.0.0.0/8"), - ipaddress.ip_network("169.254.0.0/16"), # Link-local / cloud metadata - ipaddress.ip_network("0.0.0.0/32"), -] - -_BLOCKED_IPV6_NETWORKS = [ - ipaddress.ip_network("::1/128"), - ipaddress.ip_network("::/128"), - ipaddress.ip_network("fc00::/7"), # Unique local addresses - ipaddress.ip_network("fe80::/10"), # Link-local IPv6 -] - - -def _is_private_or_reserved(ip_str: str) -> bool: - """Check if an IP address is private, reserved, or otherwise unsafe.""" - try: - addr = ipaddress.ip_address(ip_str) - # Unwrap IPv4-mapped IPv6 addresses (e.g., ::ffff:127.0.0.1) to IPv4 - # so they are only checked against IPv4 networks (avoids TypeError when - # an IPv4Address is compared against an IPv6Network). - if isinstance(addr, ipaddress.IPv6Address) and addr.ipv4_mapped: - addr = addr.ipv4_mapped - networks = ( - _BLOCKED_IPV4_NETWORKS - if isinstance(addr, ipaddress.IPv4Address) - else _BLOCKED_IPV6_NETWORKS - ) - return any(addr in network for network in networks) - except ValueError: - return True # If we can't parse, block it - - -def validate_url(url: str) -> str: - """Validate that a URL is safe to fetch. - - Blocks ``file://`` scheme entirely. For ``http``/``https``, resolves - DNS and checks that the target IP is not private or reserved (prevents - SSRF to internal services and cloud metadata endpoints). - - Args: - url: The URL to validate. - - Returns: - The validated URL string. - - Raises: - ValueError: If the URL uses a blocked scheme or resolves to a - private/reserved IP address. - """ - if _is_escape_hatch_enabled(): - logger.warning( - "%s is enabled — skipping URL validation for: %s", - _UNSAFE_PATHS_ENV, - url, - ) - return url - - parsed = urlparse(url) - - # Block file:// scheme - if parsed.scheme == "file": - raise ValueError( - f"file:// URLs are not allowed: '{url}'. " - f"Use a file path instead, or set {_UNSAFE_PATHS_ENV}=true to bypass." - ) - - # Only allow http and https - if parsed.scheme not in ("http", "https"): - raise ValueError( - f"URL scheme '{parsed.scheme}' is not allowed. Only http and https are supported." - ) - - if not parsed.hostname: - raise ValueError(f"URL has no hostname: '{url}'") - - # Resolve DNS and check IPs - try: - addrinfos = socket.getaddrinfo( - parsed.hostname, parsed.port or (443 if parsed.scheme == "https" else 80) - ) - except socket.gaierror as exc: - raise ValueError(f"Could not resolve hostname: '{parsed.hostname}'") from exc - - for _family, _, _, _, sockaddr in addrinfos: - ip_str = str(sockaddr[0]) - if _is_private_or_reserved(ip_str): - raise ValueError( - f"URL '{url}' resolves to private/reserved IP {ip_str}. " - f"Access to internal networks is not allowed. " - f"Set {_UNSAFE_PATHS_ENV}=true to bypass." - ) - - return url +__all__ = ["validate_directory_path", "validate_file_path", "validate_url"] diff --git a/lib/crewai-tools/tests/utilities/test_safe_path.py b/lib/crewai-tools/tests/utilities/test_safe_path.py index 83e247292e..4fb5d1ec7a 100644 --- a/lib/crewai-tools/tests/utilities/test_safe_path.py +++ b/lib/crewai-tools/tests/utilities/test_safe_path.py @@ -6,7 +6,7 @@ import pytest -from crewai_tools.utilities.safe_path import ( +from crewai_tools.security.safe_path import ( validate_directory_path, validate_file_path, validate_url,