Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Comment thread
theCyberTech marked this conversation as resolved.
Comment thread
theCyberTech marked this conversation as resolved.
Comment thread
theCyberTech marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,19 @@ def _run(
"tar.xz": self._compress_tar,
}
if format == "zip":
format_compression[format](input_path, output_path) # type: ignore[operator]
skipped = format_compression[format](input_path, output_path) # type: ignore[operator]
else:
format_compression[format](input_path, output_path, format) # type: ignore[operator]

return f"Successfully compressed '{input_path}' into '{output_path}'"
skipped = format_compression[format]( # type: ignore[operator]
input_path, output_path, format
)

message = f"Successfully compressed '{input_path}' into '{output_path}'"
if isinstance(skipped, list) and skipped:
message += (
f" ({len(skipped)} item(s) skipped for safety: resolved "
f"outside the allowed directory)"
)
Comment thread
theCyberTech marked this conversation as resolved.
return message
Comment thread
theCyberTech marked this conversation as resolved.
except FileNotFoundError:
return f"Error: File not found at path: {input_path}"
except PermissionError:
Expand Down Expand Up @@ -109,21 +117,45 @@ def _prepare_output(output_path: str, overwrite: bool) -> bool:
return True

@staticmethod
def _compress_zip(input_path: str, output_path: str) -> None:
"""Compresses input into a zip archive."""
def _compress_zip(input_path: str, output_path: str) -> list[str]:
"""Compresses input into a zip archive.

Returns the files skipped because they resolved outside the allow-list.
``zipfile.write`` dereferences symlinks, so each walked file is
validated to stop a symlink inside the tree from copying the contents
of an out-of-tree target (e.g. ``~/.ssh/id_rsa``) into the archive.
"""
# Defense in depth: validate the write target at the sink, so this is
# safe even if called directly rather than through _run.
output_path = validate_file_path(output_path)
input_path = validate_file_path(input_path)
skipped: list[str] = []
with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zipf:
if os.path.isfile(input_path):
zipf.write(input_path, os.path.basename(input_path))
else:
for root, _, files in os.walk(input_path):
for file in files:
full_path = os.path.join(root, file)
try:
resolved_path = validate_file_path(full_path)
except ValueError:
skipped.append(full_path)
continue
arcname = os.path.relpath(full_path, start=input_path)
zipf.write(full_path, arcname)
zipf.write(resolved_path, arcname)
return skipped

@staticmethod
def _compress_tar(input_path: str, output_path: str, format: str) -> None:
"""Compresses input into a tar archive with the given format."""
def _compress_tar(input_path: str, output_path: str, format: str) -> list[str]:
"""Compresses input into a tar archive with the given format.

Returns the members skipped for safety. ``tarfile`` stores symlinks and
hardlinks as link entries rather than dereferencing them, so the
compress-time content leak that affects zip does not apply here. The
filter drops link members so an out-of-tree symlink target cannot be
shipped inside the archive and resolved at extraction time.
"""
format_mode = {
"tar": "w",
"tar.gz": "w:gz",
Expand All @@ -135,7 +167,19 @@ def _compress_tar(input_path: str, output_path: str, format: str) -> None:
raise ValueError(f"Unsupported tar format: {format}")

mode = format_mode[format]
skipped: list[str] = []

def _drop_links(tarinfo: tarfile.TarInfo) -> tarfile.TarInfo | None:
if tarinfo.issym() or tarinfo.islnk():
skipped.append(tarinfo.name)
return None
return tarinfo

# Defense in depth: validate the write target at the sink, so this is
# safe even if called directly rather than through _run.
output_path = validate_file_path(output_path)
input_path = validate_file_path(input_path)
with tarfile.open(output_path, mode) as tarf: # type: ignore[call-overload]
arcname = os.path.basename(input_path)
tarf.add(input_path, arcname=arcname)
tarf.add(input_path, arcname=arcname, filter=_drop_links)
return skipped
111 changes: 110 additions & 1 deletion lib/crewai-tools/tests/tools/files_compressor_tool_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import os
import shutil
import tarfile
import tempfile
import zipfile
from unittest.mock import patch

from crewai_tools.tools.files_compressor_tool import FileCompressorTool
from crewai_tools.tools.files_compressor_tool.files_compressor_tool import (
FileCompressorTool,
)
import pytest


Expand Down Expand Up @@ -129,3 +136,105 @@ def test_prepare_output_makes_dir(mock_exists, mock_makedirs):
result = tool._prepare_output("some/missing/path/file.zip", overwrite=True)
assert result is True
mock_makedirs.assert_called_once()


# --- Security: symlink content must not leak out of the allow-list ---


@pytest.fixture
def symlink_env():
"""A working dir (the allowed root, via cwd) containing a normal file and a
symlink pointing at a file OUTSIDE that root.

The working directory is the allowed root for path validation, so we chdir
into ``work_dir`` rather than relying on CREWAI_TOOLS_ALLOWED_DIRS. This
keeps the test independent of the configurable-allow-list feature and
exercises the default cwd confinement.
"""
work_dir = os.path.realpath(tempfile.mkdtemp())
outside_dir = os.path.realpath(tempfile.mkdtemp()) # outside the allowed root
outside_file = os.path.join(outside_dir, "outside.txt")
with open(outside_file, "w") as f:
f.write("OUTSIDE_FILE_MARKER")

src = os.path.join(work_dir, "src")
os.makedirs(src)
with open(os.path.join(src, "normal.txt"), "w") as f:
f.write("safe content")
os.symlink(outside_file, os.path.join(src, "leak.txt"))

prev_cwd = os.getcwd()
os.chdir(work_dir)
yield {
"work_dir": work_dir,
"src": src,
"outside_dir": outside_dir,
"outside_file": outside_file,
}
os.chdir(prev_cwd)
shutil.rmtree(work_dir, ignore_errors=True)
shutil.rmtree(outside_dir, ignore_errors=True)


def test_zip_excludes_symlink_to_outside_file(tool, symlink_env):
out = os.path.join(symlink_env["work_dir"], "archive.zip")
result = tool._run(
input_path=symlink_env["src"], output_path=out, format="zip", overwrite=True
)
assert "Successfully compressed" in result
assert "skipped for safety" in result

with zipfile.ZipFile(out) as zf:
names = zf.namelist()
assert "normal.txt" in names
assert "leak.txt" not in names
blob = b"".join(zf.read(n) for n in names)
assert b"OUTSIDE_FILE_MARKER" not in blob


def test_tar_excludes_symlink_to_outside_file(tool, symlink_env):
out = os.path.join(symlink_env["work_dir"], "archive.tar.gz")
result = tool._run(
input_path=symlink_env["src"],
output_path=out,
format="tar.gz",
overwrite=True,
)
assert "Successfully compressed" in result
assert "skipped for safety" in result

with tarfile.open(out) as tf:
members = tf.getnames()
assert any(m.endswith("normal.txt") for m in members)
assert not any(m.endswith("leak.txt") for m in members)
assert all(not (tf.getmember(m).issym() or tf.getmember(m).islnk()) for m in members)


def test_compress_zip_validates_output_path_at_sink(symlink_env):
# Calling the compressor directly (bypassing _run) must still refuse to
# write outside the allow-list.
outside = os.path.join(symlink_env["outside_dir"], "evil.zip")
with pytest.raises(ValueError, match="outside the allowed director"):
FileCompressorTool._compress_zip(symlink_env["src"], outside)
assert not os.path.exists(outside)


def test_compress_tar_validates_output_path_at_sink(symlink_env):
outside = os.path.join(symlink_env["outside_dir"], "evil.tar.gz")
with pytest.raises(ValueError, match="outside the allowed director"):
FileCompressorTool._compress_tar(symlink_env["src"], outside, "tar.gz")
assert not os.path.exists(outside)


def test_compress_zip_validates_input_path_at_sink(symlink_env):
out = os.path.join(symlink_env["work_dir"], "archive.zip")
with pytest.raises(ValueError, match="outside the allowed director"):
FileCompressorTool._compress_zip(symlink_env["outside_file"], out)
assert not os.path.exists(out)


def test_compress_tar_validates_input_path_at_sink(symlink_env):
out = os.path.join(symlink_env["work_dir"], "archive.tar.gz")
with pytest.raises(ValueError, match="outside the allowed director"):
FileCompressorTool._compress_tar(symlink_env["outside_file"], out, "tar.gz")
assert not os.path.exists(out)
Loading