Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 36 additions & 26 deletions lago_python_client/services/request.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Any, Callable, Optional, Union
import time

try:
from typing import Final
Expand Down Expand Up @@ -59,13 +60,8 @@ def _create_retry_wrapper(
http_method: Callable[..., httpx.Response],
) -> Callable[..., httpx.Response]:
"""
Create a wrapper around an httpx HTTP method that retries on rate limit (429).

Args:
http_method: The httpx HTTP method to wrap (e.g., httpx.get, httpx.post).

Returns:
A wrapped version of the HTTP method that handles rate limit retries.
Create a wrapper around an httpx HTTP method that retries on rate limit (429)
AND transient network/server errors (Exponential Backoff).
"""

def retry_wrapper(
Expand All @@ -74,30 +70,44 @@ def retry_wrapper(
rate_limit_retry_config: Optional[RateLimitRetryConfig] = None,
**kwargs: Any,
) -> httpx.Response:
"""
Execute HTTP request with automatic retry on rate limit (429) responses.

Args:
url: The request URL.
rate_limit_retry_config: Configuration for rate limit retry behavior.
Defaults to a configuration with max_retries=3.
**kwargs: Additional arguments to pass to the underlying httpx method.

Returns:
The HTTP response.

Raises:
LagoRateLimitError: If rate limit is exceeded and retries exhausted.
httpx.RequestError: For other HTTP errors.
"""
if rate_limit_retry_config is None:
rate_limit_retry_config = RateLimitRetryConfig()

method_name = http_method.__name__.upper()

retry_attempt = 0

# --- ASHFIR V3 INSPIRED NETWORK RETRY CONFIG ---
network_retry_attempt = 0
max_network_retries = 3
network_delay = 1.0 # İlk hata sonrası 1 saniye bekle
# -----------------------------------------------

while True:
response = http_method(url, **kwargs)
try:
response = http_method(url, **kwargs)

# Sunucu geçici hata verdiyse (502, 503, 504), manuel olarak HTTPStatusError fırlat
if response.status_code in [502, 503, 504]:
raise httpx.HTTPStatusError(
message=f"Server error: {response.status_code}",
request=response.request,
response=response,
)

except (httpx.RequestError, httpx.TimeoutException, httpx.HTTPStatusError) as e:
# Eğer hata bir HTTPStatusError ise ve 429 (Rate Limit) ise üstteki mantığı bozma, aşağı pasla
if isinstance(e, httpx.HTTPStatusError) and e.response.status_code == 429:
response = e.response
else:
# Genel ağ hatası veya sunucu çökmesi durumu: Exponential Backoff'u tetikle!
if network_retry_attempt < max_network_retries:
time.sleep(network_delay)
network_delay *= 2 # Üstel geri çekilme (1s -> 2s -> 4s)
network_retry_attempt += 1
retry_attempt += 1 # Rate limit takibi istatistiklerini yanıltmamak için senkronize et
continue
else:
raise e # Denemeler tükendiyse hatayı fırlat

# If not rate limited, emit observability info and return the response
if not is_rate_limit_response(response):
Expand All @@ -116,7 +126,7 @@ def retry_wrapper(
retry_attempt,
)

# Wait and retry
# Wait and retry (Rate limit için)
wait_for_retry(wait_duration)
retry_attempt += 1

Expand Down
76 changes: 76 additions & 0 deletions tests/test_rate_limit.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os

import httpx
import pytest
from pytest_httpx import HTTPXMock

Expand Down Expand Up @@ -600,3 +601,78 @@ def test_silent_when_usage_pct_unavailable(self, caplog):
observer(self._info(limit=None, remaining=None))

assert caplog.records == []


class TestNetworkRetryResiliency:
def test_network_error_retries_and_exhausts(self, httpx_mock: HTTPXMock, monkeypatch):
slept_durations = []
monkeypatch.setattr("lago_python_client.services.request.time.sleep", slept_durations.append)

client = Client(api_key="test-key")
request_id = "test-id"

# Queue 4 errors (1 initial + 3 retries) so it raises on exhaustion
for _ in range(4):
httpx_mock.add_exception(
httpx.ConnectError("Connection refused"),
method="GET",
url=ENDPOINT + f"/{request_id}",
)

with pytest.raises(httpx.ConnectError):
client.api_logs.find(request_id)

# 4 attempts total = 1 initial + 3 retries
assert len(httpx_mock.get_requests()) == 4
assert slept_durations == [1.0, 2.0, 4.0]

def test_network_error_recovers(self, httpx_mock: HTTPXMock, monkeypatch):
slept_durations = []
monkeypatch.setattr("lago_python_client.services.request.time.sleep", slept_durations.append)

client = Client(api_key="test-key")
request_id = "test-id"

# 2 failures followed by 1 success
httpx_mock.add_exception(
httpx.ConnectError("Connection failed"),
method="GET",
url=ENDPOINT + f"/{request_id}",
)
httpx_mock.add_exception(
httpx.ConnectError("Connection failed again"),
method="GET",
url=ENDPOINT + f"/{request_id}",
)
httpx_mock.add_response(
method="GET",
url=ENDPOINT + f"/{request_id}",
status_code=200,
content=mock_response("fixtures/api_log.json"),
)

result = client.api_logs.find(request_id)
assert result is not None
assert len(httpx_mock.get_requests()) == 3
assert slept_durations == [1.0, 2.0]

def test_server_error_502_503_504_retries_and_exhausts(self, httpx_mock: HTTPXMock, monkeypatch):
slept_durations = []
monkeypatch.setattr("lago_python_client.services.request.time.sleep", slept_durations.append)

client = Client(api_key="test-key")
request_id = "test-id"

# Queue 4 server error responses
for status_code in [502, 503, 504, 502]:
httpx_mock.add_response(
method="GET",
url=ENDPOINT + f"/{request_id}",
status_code=status_code,
)

with pytest.raises(httpx.HTTPStatusError):
client.api_logs.find(request_id)

assert len(httpx_mock.get_requests()) == 4
assert slept_durations == [1.0, 2.0, 4.0]