Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/fides/docs/config/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,21 @@ task_default_queue = "fides"
task_always_eager = true
```

#### Celery OpenTelemetry-style tracing

By default this is **off**. When enabled, trace context crosses Celery boundaries and completed task spans are logged at INFO (without an OTLP collector). Spans use structured Loguru lines with the message `otel.celery.span`, including wall-clock `span_start_unix_ns` / `span_end_unix_ns` (nanoseconds since Unix epoch) and `span_duration_ms` when both timestamps are present.

To turn tracing on:

```toml
[logging]
celery_otel_tracing = true
```

```sh
export FIDES__LOGGING__CELERY_OTEL_TRACING=true
```

### Credentials

The credentials section uses custom keys which can be referenced in specific commands that take the --credentials-id option. For example, a command that uses a credential might look like `fides scan dataset db --credentials-id app_postgres`. The credential object itself will be validated at the time of use depending on what type of credential is required. For instance if `fides scan system okta` is used, it will expect the object to contain orgUrl, clientId, and privateKey key/value pairs for OAuth2 authentication. In the case of a typical database like postgres, it will only expect a connection_string. The following is an example of what a credentials section might look like in a given deployment with various applications:
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ dependencies = [
"okta==3.1.0",
"onepassword-sdk==0.3.0",
"openpyxl==3.0.9",
"opentelemetry-api~=1.28.0",
"opentelemetry-sdk~=1.28.0",
"packaging==23.0",
"paramiko==3.4.1",
"passlib[bcrypt]==1.7.4",
Expand Down
2 changes: 1 addition & 1 deletion src/fides/api/models/identity_definition.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from enum import Enum, StrEnum
from enum import StrEnum

from sqlalchemy import Boolean, Column, String, Text
from sqlalchemy.ext.declarative import declared_attr
Expand Down
82 changes: 59 additions & 23 deletions src/fides/api/task/graph_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import copy
import traceback
from abc import ABC
from contextlib import AbstractContextManager, nullcontext
from functools import wraps
from time import sleep
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
Expand Down Expand Up @@ -44,6 +45,8 @@
from fides.api.models.worker_task import ExecutionLogStatus
from fides.api.schemas.policy import ActionType, CurrentStep
from fides.api.service.connectors.base_connector import BaseConnector
from fides.api.service.connectors.saas_connector import SaaSConnector
from fides.api.service.connectors.sql_connector import SQLConnector
from fides.api.service.execution_context import collect_execution_log_messages
from fides.api.task.consolidate_query_matches import consolidate_query_matches
from fides.api.task.filter_element_match import filter_element_match
Expand All @@ -65,6 +68,7 @@
from fides.api.util.saas_util import FIDESOPS_GROUPED_INPUTS
from fides.common.session_management import get_autoclose_db_session as get_db
from fides.config import CONFIG
from fides.observability import celery_step_span

COLLECTION_FIELD_PATH_MAP = Dict[CollectionAddress, List[Tuple[FieldPath, FieldPath]]]

Expand Down Expand Up @@ -296,6 +300,35 @@ def can_write_data(self) -> bool:
return True
return connection_config.access == AccessLevel.write

def _dsr_graph_node_connector_span(
self, action: str
) -> AbstractContextManager[Any]:
"""Child OTEL span for SQL or SaaS connector I/O (same trace as the Celery task)."""
connector = self.connector
if isinstance(connector, SQLConnector):
family = "sql"
elif isinstance(connector, SaaSConnector):
family = "saas"
else:
return nullcontext()

cfg = connector.configuration
conn_type = cfg.connection_type
connection_type_str = (
conn_type.value if isinstance(conn_type, ConnectionType) else str(conn_type)
)
attrs: Dict[str, Any] = {
"dsr.node.action": action,
"dsr.node.collection": str(self.execution_node.address),
"dsr.node.connection_key": cfg.key,
"dsr.node.connection_type": connection_type_str,
"dsr.connector.family": family,
}
if self._saas_version:
attrs["dsr.node.saas_version"] = self._saas_version

return celery_step_span(f"dsr.graph_task.{action}", **attrs)

def _combine_seed_data(
self,
*data: List[Row],
Expand Down Expand Up @@ -730,13 +763,14 @@ def access_request(self, *inputs: List[Row]) -> List[Row]:

# Use execution context to capture postprocessor messages
with collect_execution_log_messages() as messages:
output: List[Row] = self.connector.retrieve_data(
self.execution_node,
self.resources.policy,
self.resources.request,
self.request_task,
formatted_input_data,
)
with self._dsr_graph_node_connector_span("access"):
output: List[Row] = self.connector.retrieve_data(
self.execution_node,
self.resources.policy,
self.resources.request,
self.request_task,
formatted_input_data,
)

if is_traversal_only:
# TRAVERSAL_ONLY bridge nodes: retrieve data for FK propagation
Expand Down Expand Up @@ -843,14 +877,15 @@ def erasure_request(

# Use execution context to capture postprocessor messages
with collect_execution_log_messages() as messages:
output = self.connector.mask_data(
self.execution_node,
self.resources.policy,
self.resources.request,
self.resources.privacy_request_task,
retrieved_data,
formatted_input_data,
)
with self._dsr_graph_node_connector_span("erasure"):
output = self.connector.mask_data(
self.execution_node,
self.resources.policy,
self.resources.request,
self.resources.privacy_request_task,
retrieved_data,
formatted_input_data,
)

self.request_task.rows_masked = output # Saved as part of update_status below

Expand Down Expand Up @@ -890,14 +925,15 @@ def consent_request(self, identity: Dict[str, Any]) -> bool:
"Sending consent request to connector {}",
self.connector.configuration.key,
)
output: bool = self.connector.run_consent_request(
self.execution_node,
self.resources.policy,
self.resources.request,
self.resources.privacy_request_task,
identity,
self.resources.session,
)
with self._dsr_graph_node_connector_span("consent"):
output: bool = self.connector.run_consent_request(
self.execution_node,
self.resources.policy,
self.resources.request,
self.resources.privacy_request_task,
identity,
self.resources.session,
)
self.request_task.consent_sent = output
logger.info(
"Consent request to {} completed, consent_sent={}",
Expand Down
3 changes: 3 additions & 0 deletions src/fides/api/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from fides.api.tasks import celery_healthcheck
from fides.api.util.logger import setup as setup_logging
from fides.config import CONFIG, FidesConfig
from fides.observability.celery_tracing import configure_celery_tracing

MESSAGING_QUEUE_NAME = "fidesops.messaging"
PRIVACY_PREFERENCES_QUEUE_NAME = "fides.privacy_preferences" # This queue is used in Fidesplus for saving privacy preferences and notices served
Expand Down Expand Up @@ -170,6 +171,8 @@ def _create_celery(config: FidesConfig = CONFIG) -> Celery:

app.autodiscover_tasks(autodiscover_task_locations)

configure_celery_tracing(config)

return app


Expand Down
4 changes: 4 additions & 0 deletions src/fides/config/logging_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ class LoggingSettings(FidesSettings):
default=False,
description="If True, PII values will display unmasked in log output. This variable should always be set to 'False' in production systems.",
)
celery_otel_tracing: bool = Field(
default=False,
description="When True, propagate OpenTelemetry trace context across Celery boundaries and log completed task spans at INFO (requires OpenTelemetry packages). Defaults to False; set to True to enable.",
)

@field_validator("destination", mode="before")
@classmethod
Expand Down
9 changes: 9 additions & 0 deletions src/fides/observability/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Observability helpers (Celery OpenTelemetry tracing, etc.)."""

from fides.observability.celery_tracing import (
celery_step_span,
celery_traced_function,
configure_celery_tracing,
)

__all__ = ["celery_step_span", "celery_traced_function", "configure_celery_tracing"]
Loading
Loading