Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
205 changes: 205 additions & 0 deletions lib/crewai-tools/src/crewai_tools/security/safe_path.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
"""Path and URL validation utilities for crewai-tools.

Provides validation for file paths and URLs to prevent unauthorized
file access and server-side request forgery (SSRF) when tools accept
user-controlled or LLM-controlled inputs at runtime.

Set CREWAI_TOOLS_ALLOW_UNSAFE_PATHS=true to bypass validation (not
recommended for production).
"""

from __future__ import annotations

import ipaddress
import logging
import os
import socket
from urllib.parse import urlparse


logger = logging.getLogger(__name__)

_UNSAFE_PATHS_ENV = "CREWAI_TOOLS_ALLOW_UNSAFE_PATHS"


def _is_escape_hatch_enabled() -> bool:
"""Check if the unsafe paths escape hatch is enabled."""
return os.environ.get(_UNSAFE_PATHS_ENV, "").lower() in ("true", "1", "yes")


# ---------------------------------------------------------------------------
# File path validation
# ---------------------------------------------------------------------------


def validate_file_path(path: str, base_dir: str | None = None) -> str:
"""Validate that a file path is safe to read.

Resolves symlinks and ``..`` components, then checks that the resolved
path falls within *base_dir* (defaults to the current working directory).

Args:
path: The file path to validate.
base_dir: Allowed root directory. Defaults to ``os.getcwd()``.

Returns:
The resolved, validated absolute path.

Raises:
ValueError: If the path escapes the allowed directory.
"""
if _is_escape_hatch_enabled():
logger.warning(
"%s is enabled — skipping file path validation for: %s",
_UNSAFE_PATHS_ENV,
path,
)
return os.path.realpath(path)

if base_dir is None:
base_dir = os.getcwd()

resolved_base = os.path.realpath(base_dir)
resolved_path = os.path.realpath(
os.path.join(resolved_base, path) if not os.path.isabs(path) else path
)

# Ensure the resolved path is within the base directory.
# When resolved_base already ends with a separator (e.g. the filesystem
# root "/"), appending os.sep would double it ("//"), so use the base
# as-is in that case.
prefix = resolved_base if resolved_base.endswith(os.sep) else resolved_base + os.sep
if not resolved_path.startswith(prefix) and resolved_path != resolved_base:
raise ValueError(
f"Path '{path}' resolves to '{resolved_path}' which is outside "
f"the allowed directory '{resolved_base}'. "
f"Set {_UNSAFE_PATHS_ENV}=true to bypass this check."
)

return resolved_path


def validate_directory_path(path: str, base_dir: str | None = None) -> str:
"""Validate that a directory path is safe to read.

Same as :func:`validate_file_path` but also checks that the path
is an existing directory.

Args:
path: The directory path to validate.
base_dir: Allowed root directory. Defaults to ``os.getcwd()``.

Returns:
The resolved, validated absolute path.

Raises:
ValueError: If the path escapes the allowed directory or is not a directory.
"""
validated = validate_file_path(path, base_dir)
if not os.path.isdir(validated):
raise ValueError(f"Path '{validated}' is not a directory.")
return validated


# ---------------------------------------------------------------------------
# URL validation
# ---------------------------------------------------------------------------

# Private and reserved IP ranges that should not be accessed
_BLOCKED_IPV4_NETWORKS = [
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("169.254.0.0/16"), # Link-local / cloud metadata
ipaddress.ip_network("0.0.0.0/32"),
]

_BLOCKED_IPV6_NETWORKS = [
ipaddress.ip_network("::1/128"),
ipaddress.ip_network("::/128"),
ipaddress.ip_network("fc00::/7"), # Unique local addresses
ipaddress.ip_network("fe80::/10"), # Link-local IPv6
]


def _is_private_or_reserved(ip_str: str) -> bool:
"""Check if an IP address is private, reserved, or otherwise unsafe."""
try:
addr = ipaddress.ip_address(ip_str)
# Unwrap IPv4-mapped IPv6 addresses (e.g., ::ffff:127.0.0.1) to IPv4
# so they are only checked against IPv4 networks (avoids TypeError when
# an IPv4Address is compared against an IPv6Network).
if isinstance(addr, ipaddress.IPv6Address) and addr.ipv4_mapped:
addr = addr.ipv4_mapped
networks = (
_BLOCKED_IPV4_NETWORKS
if isinstance(addr, ipaddress.IPv4Address)
else _BLOCKED_IPV6_NETWORKS
)
return any(addr in network for network in networks)
except ValueError:
return True # If we can't parse, block it


def validate_url(url: str) -> str:
"""Validate that a URL is safe to fetch.

Blocks ``file://`` scheme entirely. For ``http``/``https``, resolves
DNS and checks that the target IP is not private or reserved (prevents
SSRF to internal services and cloud metadata endpoints).

Args:
url: The URL to validate.

Returns:
The validated URL string.

Raises:
ValueError: If the URL uses a blocked scheme or resolves to a
private/reserved IP address.
"""
if _is_escape_hatch_enabled():
logger.warning(
"%s is enabled — skipping URL validation for: %s",
_UNSAFE_PATHS_ENV,
url,
)
return url

parsed = urlparse(url)

# Block file:// scheme
if parsed.scheme == "file":
raise ValueError(
f"file:// URLs are not allowed: '{url}'. "
f"Use a file path instead, or set {_UNSAFE_PATHS_ENV}=true to bypass."
)

# Only allow http and https
if parsed.scheme not in ("http", "https"):
raise ValueError(
f"URL scheme '{parsed.scheme}' is not allowed. Only http and https are supported."
)

if not parsed.hostname:
raise ValueError(f"URL has no hostname: '{url}'")

# Resolve DNS and check IPs
try:
addrinfos = socket.getaddrinfo(
parsed.hostname, parsed.port or (443 if parsed.scheme == "https" else 80)
)
except socket.gaierror as exc:
raise ValueError(f"Could not resolve hostname: '{parsed.hostname}'") from exc

for _family, _, _, _, sockaddr in addrinfos:
ip_str = str(sockaddr[0])
if _is_private_or_reserved(ip_str):
raise ValueError(
f"URL '{url}' resolves to private/reserved IP {ip_str}. "
f"Access to internal networks is not allowed. "
f"Set {_UNSAFE_PATHS_ENV}=true to bypass."
)

return url
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from pydantic import BaseModel, Field
import requests

from crewai_tools.security.safe_path import validate_url


class BrightDataConfig(BaseModel):
API_URL: str = "https://api.brightdata.com/request"
Expand Down Expand Up @@ -134,6 +136,7 @@ def _run(
"Content-Type": "application/json",
}

validate_url(url)
try:
response = requests.post(
self.base_url, json=payload, headers=headers, timeout=30
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from crewai.tools import BaseTool
from pydantic import BaseModel, Field

from crewai_tools.security.safe_path import validate_file_path


class ContextualAICreateAgentSchema(BaseModel):
"""Schema for contextual create agent tool."""
Expand Down Expand Up @@ -47,6 +49,7 @@ def _run(
document_paths: list[str],
) -> str:
"""Create a complete RAG pipeline with documents."""
resolved_paths = [validate_file_path(doc_path) for doc_path in document_paths]
try:
import os

Expand All @@ -56,7 +59,7 @@ def _run(

# Upload documents
document_ids = []
for doc_path in document_paths:
for doc_path in resolved_paths:
if not os.path.exists(doc_path):
raise FileNotFoundError(f"Document not found: {doc_path}")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from crewai.tools import BaseTool
from pydantic import BaseModel, Field

from crewai_tools.security.safe_path import validate_file_path


class ContextualAIParseSchema(BaseModel):
"""Schema for contextual parse tool."""
Expand Down Expand Up @@ -45,6 +47,7 @@ def _run(
"""Parse a document using Contextual AI's parser."""
if output_types is None:
output_types = ["markdown-per-page"]
file_path = validate_file_path(file_path)
try:
import json
import os
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from crewai.tools import BaseTool
from pydantic import BaseModel, Field

from crewai_tools.security.safe_path import validate_directory_path


class FixedDirectoryReadToolSchema(BaseModel):
"""Input for DirectoryReadTool."""
Expand Down Expand Up @@ -39,6 +41,7 @@ def _run(
if directory is None:
raise ValueError("Directory must be provided.")

directory = validate_directory_path(directory)
if directory[-1] == "/":
directory = directory[:-1]
files_list = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from pydantic import BaseModel, Field

from crewai_tools.rag.data_types import DataType
from crewai_tools.security.safe_path import validate_directory_path
from crewai_tools.tools.rag.rag_tool import RagTool
from crewai_tools.utilities.safe_path import validate_directory_path


class FixedDirectorySearchToolSchema(BaseModel):
Expand Down Expand Up @@ -38,7 +38,7 @@ def __init__(self, directory: str | None = None, **kwargs: Any) -> None:
self._generate_description()

def add(self, directory: str) -> None: # type: ignore[override]
validate_directory_path(directory)
directory = validate_directory_path(directory)
super().add(directory, data_type=DataType.DIRECTORY)

def _run( # type: ignore[override]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from crewai.tools import BaseTool
from pydantic import BaseModel, Field

from crewai_tools.security.safe_path import validate_file_path


class FileReadToolSchema(BaseModel):
"""Input for FileReadTool."""
Expand Down Expand Up @@ -76,6 +78,7 @@ def _run(
if file_path is None:
return "Error: No file path provided. Please provide a file path either in the constructor or as an argument."

file_path = validate_file_path(file_path)
try:
with open(file_path, "r") as file:
if start_line == 1 and line_count is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from crewai.tools import BaseTool
from pydantic import BaseModel, Field

from crewai_tools.security.safe_path import validate_file_path


class FileCompressorToolInput(BaseModel):
"""Input schema for FileCompressorTool."""
Expand Down Expand Up @@ -40,12 +42,15 @@ def _run(
overwrite: bool = False,
format: str = "zip",
) -> str:
input_path = validate_file_path(input_path)
if not os.path.exists(input_path):
return f"Input path '{input_path}' does not exist."

if not output_path:
output_path = self._generate_output_path(input_path, format)

output_path = validate_file_path(output_path)

format_extension = {
"zip": ".zip",
"tar": ".tar",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from crewai.tools import BaseTool, EnvVar
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr

from crewai_tools.security.safe_path import validate_url


try:
from firecrawl import FirecrawlApp # type: ignore[import-untyped]
Expand Down Expand Up @@ -106,6 +108,7 @@ def _run(self, url: str) -> Any:
if not self._firecrawl:
raise RuntimeError("FirecrawlApp not properly initialized")

url = validate_url(url)
return self._firecrawl.crawl(url=url, poll_interval=2, **self.config)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from crewai.tools import BaseTool, EnvVar
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr

from crewai_tools.security.safe_path import validate_url


try:
from firecrawl import FirecrawlApp # type: ignore[import-untyped]
Expand Down Expand Up @@ -106,6 +108,7 @@ def _run(self, url: str) -> Any:
if not self._firecrawl:
raise RuntimeError("FirecrawlApp not properly initialized")

url = validate_url(url)
return self._firecrawl.scrape(url=url, **self.config)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from crewai.tools import BaseTool, EnvVar
from pydantic import BaseModel, Field

from crewai_tools.security.safe_path import validate_url


class HyperbrowserLoadToolSchema(BaseModel):
url: str = Field(description="Website URL")
Expand Down Expand Up @@ -119,6 +121,7 @@ def _run(
) from e

params = self._prepare_params(params)
url = validate_url(url)

if operation == "scrape":
scrape_params = StartScrapeJobParams(url=url, **params)
Expand Down
Loading
Loading