Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,7 @@ tks-trigger-cache.json
selector-cache.txt
*.cache
.uv/

# File-backed disk caches (utils/disk_cache.py)
source-cache/
label-cache/
10 changes: 8 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
"""

import os
from pathlib import Path

import pytest


@pytest.fixture(autouse=True)
def _isolate_from_live_apis(monkeypatch: pytest.MonkeyPatch) -> None:
def _isolate_from_live_apis(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Block accidental live API/RPC calls and reset cross-test singletons.

Strips `ETHERSCAN_TOKEN` and every `PROVIDER_URL_*` so a missing mock
Expand All @@ -23,11 +24,16 @@ def _isolate_from_live_apis(monkeypatch: pytest.MonkeyPatch) -> None:
those code paths opt back in via @patch.dict.

Also clears `ChainManager._instances` so a real client object cached by
one test can't leak into the next.
one test can't leak into the next, and points `CACHE_DIR` at a per-test
temp dir so the file-backed disk caches (utils.disk_cache) never litter the
repo and never leak entries between tests.
"""
for key in list(os.environ):
if key == "ETHERSCAN_TOKEN" or key.startswith("PROVIDER_URL_"):
monkeypatch.delenv(key, raising=False)
# `cache_path` reads this module global at call time, so the redirect takes
# effect for caches created at import (they resolve their dir lazily).
monkeypatch.setattr("utils.cache.CACHE_DIR", str(tmp_path))
try:
from utils.web3_wrapper import ChainManager

Expand Down
114 changes: 114 additions & 0 deletions tests/test_disk_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""Tests for utils/disk_cache.py.

These rely on the autouse conftest fixture that redirects CACHE_DIR to a
per-test temp dir, so every DiskCache here writes under an isolated location.
"""

import os
import unittest
from unittest.mock import patch

from utils.disk_cache import MISS, DiskCache


class TestDiskCacheRoundtrip(unittest.TestCase):
def test_positive_roundtrip(self) -> None:
cache = DiskCache(namespace="rt")
cache.set_positive("k", {"a": 1, "b": ["x", "y"]})
self.assertEqual(cache.get("k"), {"a": 1, "b": ["x", "y"]})

def test_absent_key_returns_miss(self) -> None:
cache = DiskCache(namespace="rt")
self.assertIs(cache.get("nope"), MISS)

def test_negative_value_none_is_distinct_from_miss(self) -> None:
cache = DiskCache(namespace="neg")
cache.set_negative("k") # stores value None
self.assertIsNone(cache.get("k")) # a cached negative, not MISS
self.assertIs(cache.get("other"), MISS)

def test_empty_list_negative_roundtrips(self) -> None:
cache = DiskCache(namespace="neg")
cache.set_negative("k", [])
self.assertEqual(cache.get("k"), [])

def test_clear_removes_entries(self) -> None:
cache = DiskCache(namespace="clr")
cache.set_positive("a", 1)
cache.set_positive("b", 2)
cache.clear()
self.assertIs(cache.get("a"), MISS)
self.assertIs(cache.get("b"), MISS)


class TestDiskCacheTTL(unittest.TestCase):
def test_negative_entry_expires(self) -> None:
cache = DiskCache(namespace="ttl", negative_ttl=10)
with patch("utils.disk_cache.time.time") as mock_time:
mock_time.return_value = 1000.0
cache.set_negative("k")
mock_time.return_value = 1005.0 # within TTL
self.assertIsNone(cache.get("k"))
mock_time.return_value = 1011.0 # past TTL
self.assertIs(cache.get("k"), MISS)

def test_positive_entry_never_expires(self) -> None:
cache = DiskCache(namespace="ttl")
with patch("utils.disk_cache.time.time") as mock_time:
mock_time.return_value = 1000.0
cache.set_positive("k", "v")
mock_time.return_value = 1000.0 + 10**9 # far future
self.assertEqual(cache.get("k"), "v")


class TestDiskCacheEviction(unittest.TestCase):
def test_evicts_to_max_entries_keeping_newest(self) -> None:
cache = DiskCache(namespace="evict", max_entries=2)
cache.set_positive("a", 1)
cache.set_positive("b", 2)
# Force deterministic mtime ordering (a oldest) regardless of FS resolution.
os.utime(cache._path("a"), (100, 100))
os.utime(cache._path("b"), (200, 200))
cache.set_positive("c", 3) # fresh mtime; eviction drops the oldest ("a")

self.assertIs(cache.get("a"), MISS) # evicted
self.assertEqual(cache.get("b"), 2)
self.assertEqual(cache.get("c"), 3)

def test_read_refreshes_lru_recency(self) -> None:
# Reading "a" before inserting "c" must keep "a" and evict the unread "b",
# even though "a" was written first. (Guards against FIFO-by-write-time.)
cache = DiskCache(namespace="lru", max_entries=2)
cache.set_positive("a", 1)
cache.set_positive("b", 2)
os.utime(cache._path("a"), (100, 100)) # a oldest-written
os.utime(cache._path("b"), (200, 200)) # b newer
cache.get("a") # LRU touch bumps "a" above the unread "b"
cache.set_positive("c", 3) # over cap → evict least-recently-used ("b")

self.assertEqual(cache.get("a"), 1) # kept: recently read
self.assertIs(cache.get("b"), MISS) # evicted: never read, oldest use
self.assertEqual(cache.get("c"), 3)

def test_evicts_to_max_bytes(self) -> None:
big = "x" * 2000
cache = DiskCache(namespace="bytes", max_bytes=3000)
cache.set_positive("a", big)
os.utime(cache._path("a"), (100, 100)) # mark "a" oldest before "b" triggers eviction
cache.set_positive("b", big) # two ~2KB entries exceed the 3KB cap → "a" dropped
self.assertIs(cache.get("a"), MISS)
self.assertEqual(cache.get("b"), big)


class TestDiskCacheResilience(unittest.TestCase):
def test_corrupt_file_is_a_miss(self) -> None:
cache = DiskCache(namespace="corrupt")
cache.set_positive("k", "v")
# Overwrite with garbage.
with open(cache._path("k"), "w") as f:
f.write("{not json")
self.assertIs(cache.get("k"), MISS)


if __name__ == "__main__":
unittest.main()
43 changes: 43 additions & 0 deletions tests/test_source_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,49 @@ def test_caches_per_address(self, mock_fetch: object) -> None:
# Two calls — Etherscan should be hit only once
self.assertEqual(mock_fetch.call_count, 1) # type: ignore[attr-defined]

@patch.dict("os.environ", {"ETHERSCAN_TOKEN": "test-key"})
@patch("utils.source_context.fetch_json")
def test_verified_source_persists_across_process_restart(self, mock_fetch: object) -> None:
# A persistent disk cache should serve the same address after the in-memory
# layer is dropped (reset_cache simulates a fresh cron process).
mock_fetch.return_value = { # type: ignore[attr-defined]
"status": "1",
"result": [{"SourceCode": INFINIFI_FARM_SOURCE, "ContractName": "Farm"}],
}
self.assertIsNotNone(get_source_context(1, "0xabc", "setMaxSlippage"))
reset_cache() # clears in-memory only; disk cache survives
ctx = get_source_context(1, "0xabc", "setMaxSlippage")
self.assertIsNotNone(ctx)
self.assertEqual(mock_fetch.call_count, 1) # type: ignore[attr-defined] # served from disk

@patch.dict("os.environ", {"ETHERSCAN_TOKEN": "test-key"})
@patch("utils.source_context.fetch_json")
def test_unverified_negative_persists_across_process_restart(self, mock_fetch: object) -> None:
mock_fetch.return_value = { # type: ignore[attr-defined]
"status": "1",
"result": [{"SourceCode": "", "ContractName": ""}],
}
self.assertIsNone(get_source_context(1, "0xabc", "setMaxSlippage"))
reset_cache()
self.assertIsNone(get_source_context(1, "0xabc", "setMaxSlippage"))
self.assertEqual(mock_fetch.call_count, 1) # type: ignore[attr-defined] # negative cached on disk

@patch.dict("os.environ", {"ETHERSCAN_TOKEN": "test-key"})
@patch("utils.source_context.fetch_json")
def test_transient_error_is_not_persisted(self, mock_fetch: object) -> None:
# A request failure (fetch_json -> None) must not be cached as "unverified".
mock_fetch.return_value = None # type: ignore[attr-defined]
self.assertIsNone(get_source_context(1, "0xabc", "setMaxSlippage"))
reset_cache()
# Etherscan recovers: a later run should re-fetch and succeed, proving the
# blip was never persisted.
mock_fetch.return_value = { # type: ignore[attr-defined]
"status": "1",
"result": [{"SourceCode": INFINIFI_FARM_SOURCE, "ContractName": "Farm"}],
}
self.assertIsNotNone(get_source_context(1, "0xabc", "setMaxSlippage"))
self.assertEqual(mock_fetch.call_count, 2) # type: ignore[attr-defined]

@patch.dict("os.environ", {"ETHERSCAN_TOKEN": "test-key"})
@patch("utils.proxy.get_current_implementation")
@patch("utils.source_context.fetch_json")
Expand Down
32 changes: 32 additions & 0 deletions tests/test_swiss_knife.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,38 @@ def test_caches_repeat_lookups(self, mock_fetch: object) -> None:
fetch_swiss_knife_labels(addr, 1)
self.assertEqual(mock_fetch.call_count, 1) # type: ignore[attr-defined]

@patch("utils.swiss_knife.fetch_json")
def test_labels_persist_across_process_restart(self, mock_fetch: object) -> None:
# Disk cache should serve labels after the in-memory layer is dropped.
mock_fetch.return_value = ["Curve.fi: 3pool"] # type: ignore[attr-defined]
addr = "0x" + "d0" * 20
fetch_swiss_knife_labels(addr, 1)
reset_cache() # clears in-memory only
self.assertEqual(fetch_swiss_knife_labels(addr, 1), ["Curve.fi: 3pool"])
self.assertEqual(mock_fetch.call_count, 1) # type: ignore[attr-defined] # served from disk

@patch("utils.swiss_knife.fetch_json")
def test_empty_negative_persists_across_process_restart(self, mock_fetch: object) -> None:
# An unknown address (dict error body = a real 200 response) is cached as
# an empty negative so we don't re-query it every run.
mock_fetch.return_value = {"error": "Error fetching data"} # type: ignore[attr-defined]
addr = "0x" + "e0" * 20
self.assertEqual(fetch_swiss_knife_labels(addr, 1), [])
reset_cache()
self.assertEqual(fetch_swiss_knife_labels(addr, 1), [])
self.assertEqual(mock_fetch.call_count, 1) # type: ignore[attr-defined] # negative cached on disk

@patch("utils.swiss_knife.fetch_json")
def test_transient_error_is_not_persisted(self, mock_fetch: object) -> None:
# fetch_json -> None is a network/HTTP failure, not "no labels"; never persist.
mock_fetch.return_value = None # type: ignore[attr-defined]
addr = "0x" + "f0" * 20
self.assertEqual(fetch_swiss_knife_labels(addr, 1), [])
reset_cache()
mock_fetch.return_value = ["Aave: Pool"] # type: ignore[attr-defined]
self.assertEqual(fetch_swiss_knife_labels(addr, 1), ["Aave: Pool"])
self.assertEqual(mock_fetch.call_count, 2) # type: ignore[attr-defined]


class TestPickDisplayName(unittest.TestCase):
"""Sanity-check that we only use Swiss Knife's first label when it looks like a name."""
Expand Down
Loading