diff --git a/Makefile b/Makefile index 2041ded..c49a2bb 100644 --- a/Makefile +++ b/Makefile @@ -33,11 +33,11 @@ test: .PHONY: test-integration test-integration: - uv run pytest tests/integration -m "integration" --run-integration + uv run pytest tests -m "integration" --run-integration .PHONY: test-all test-all: - uv run pytest --cov=data_bridges_knots --cov-report=html + uv run pytest --cov=data_bridges_knots --cov-report=html --run-integration uv run coverage-badge -o assets/images/coverage.svg -f #* Typing @@ -51,7 +51,7 @@ mypy: #* All in one .PHONY: lint -lint: test check-codestyle mypy +lint: codestyle test-all mypy #* DOCS diff --git a/data_bridges_knots/__init__.py b/data_bridges_knots/__init__.py index 1bd84f2..2dbfbc8 100644 --- a/data_bridges_knots/__init__.py +++ b/data_bridges_knots/__init__.py @@ -4,11 +4,12 @@ Wrapper for DataBridges client. """ -from .client import DataBridgesShapes, config_from_env +from .client import DataBridgesKnots, DataBridgesShapes, config_from_env from .labels import get_choice_labels, get_variable_labels, map_value_labels __all__ = [ "DataBridgesShapes", + "DataBridgesKnots", "labels", "get_variable_labels", "get_choice_labels", diff --git a/data_bridges_knots/client.py b/data_bridges_knots/client.py index c98e046..1b615fb 100644 --- a/data_bridges_knots/client.py +++ b/data_bridges_knots/client.py @@ -1,19 +1,63 @@ -from typing import Dict, Literal, Optional, Union +from typing import Dict, Union import logging import os -import time import warnings -from datetime import date import data_bridges_client -import numpy as np -import pandas as pd import yaml -from data_bridges_client.rest import ApiException from data_bridges_client.token import WfpApiToken -from data_bridges_knots.helpers import get_adm0_code +from data_bridges_knots.endpoints.commodityApi import ( + get_commodities_list, + get_commodity_categories_list, + get_commodity_units_conversion_list, + get_commodity_units_list, +) +from data_bridges_knots.endpoints.currencyApi import ( + get_currency_list, + get_exchange_rates, + get_usd_indirect_quotation, +) +from data_bridges_knots.endpoints.economicDataApi import get_economic_indicator_list +from data_bridges_knots.endpoints.globalOutlookApi import get_global_outlook +from data_bridges_knots.endpoints.householdApi import ( + get_choice_list, + get_household_questionnaire, + get_household_survey, + get_household_surveys_list, + get_household_xlsform_definition, +) +from data_bridges_knots.endpoints.hungerHotpotApi import get_hunger_hotspot_data +from data_bridges_knots.endpoints.incubationApi import ( + get_cari_data, +) +from data_bridges_knots.endpoints.ipcChApi import ( + get_ipc_and_equivalent_data, +) +from data_bridges_knots.endpoints.marketPricesApi import get_prices +from data_bridges_knots.endpoints.marketsApi import ( + get_market_geojson_list, + get_markets_as_csv, + get_markets_list, + get_nearby_markets, +) +from data_bridges_knots.endpoints.rpmeApi import ( + get_rpme_base_data, + get_rpme_full_data, + get_rpme_output_values, + get_rpme_surveys, + get_rpme_variables, + get_rpme_xls_forms, +) +from data_bridges_knots.endpoints.surveysApi import ( + get_mfi_surveys, + get_mfi_surveys_base_data, + get_mfi_surveys_full_data, + get_mfi_surveys_processed_data, + get_mfi_xls_forms, + get_mfi_xls_forms_detailed, +) logname = "data_bridges_api_calls.log" logging.basicConfig( @@ -77,7 +121,7 @@ def config_from_env() -> Dict: return config -class DataBridgesShapes: +class DataBridgesKnots: """Interface to the Data Bridges API. Provides methods for fetching market prices, exchange rates, food security data, @@ -85,7 +129,7 @@ class DataBridgesShapes: environment variables. Args: - yaml_config_path (str | dict): Either: + config_path (str | dict): Either: - Path to a YAML configuration file (str), or - Configuration dictionary (e.g. .env) with required keys: WFP_API_CLIENT_ID, WFP_API_CLIENT_SECRET, and optionally DATABRIDGES_API_KEY @@ -104,45 +148,29 @@ class DataBridgesShapes: ... 'WFP_API_CLIENT_SECRET': 'your-client-secret', ... 'DATABRIDGES_API_KEY': 'optional-databridges-key' ... } - >>> client = DataBridgesShapes(config) + >>> client = DataBridgesKnots(config) >>> # Initialize from environment variables >>> from data_bridges_knots.client import config_from_env - >>> client = DataBridgesShapes(config_from_env()) + >>> client = DataBridgesKnots(config_from_env()) """ - def __init__(self, yaml_config_path, env="prod", api_version="v1"): - - warnings.warn( - ( - "\n[FUTURE WARNING]\n" - "DataBridgesShapes will be renamed to 'DataBridgesKnots' " - "in version 4.0.0 (next major release, scheduled for 1 July 2026).\n" - "Please update your imports accordingly.\n" - ), - FutureWarning, - stacklevel=2, - ) - - # Initialize instance variables + def __init__(self, config_path, env="prod", api_version="v1"): self.api_version = api_version self.env = env self.xlsform = None - # Load and validate config once - self.config = self._load_config(yaml_config_path) + self.config = self._load_config(config_path) self._validate_config(self.config) - - # Setup authentication and extract API key self.configuration = self._setup_configuration_and_authentication(self.config) self.data_bridges_api_key = self.config.get("DATABRIDGES_API_KEY", "") def __repr__(self): - return f"DataBridgesShapes(host='{self.configuration.host}', env='{self.env}'), api_version='{self.api_version}'" + return f"DataBridgesKnots(host='{self.configuration.host}', env='{self.env}'), api_version='{self.api_version}'" def __str__(self): return ( - f"DataBridgesShapes\n" + f"DataBridgesKnots\n" f" API Host: {self.configuration.host}\n" f" Environment: {self.env}\n" f"\n" @@ -218,1414 +246,96 @@ def _setup_configuration_and_authentication(self, config: Dict): logger.debug("Token used: %s", token.__repr__()) return configuration - def get_prices( - self, - country_iso3: str, - start_date: Optional[str] = None, - end_date: Optional[str] = None, - page_size: int = 1000, - market_id: int = 0, - commodity_id: int = 0, - currency_id: int = 0, - price_flag: str = "", - latest_value_only: bool = False, - ) -> pd.DataFrame: - """Fetches market price data for a given country within a specified date range. - - Args: - country_iso3 (str): The ISO 3-letter country code - start_date (str, optional): Start date in ISO format (e.g., '2022-01-01'). - If None, defaults to today's date. - end_date (str, optional): End date in ISO format (e.g., '2022-01-01'). - If None, defaults to today's date. - page_size (int, optional): Number of items per page. Defaults to 1000. - market_id (int, optional): Unique ID of a Market. Defaults to 0. - commodity_id (int, optional): The exact ID of a Commodity. Defaults to 0. - currency_id (int, optional): The exact ID of a currency. Defaults to 0. - price_flag (str, optional): Type of price data: [actual|aggregate|estimated|forecasted]. Defaults to ''. - latest_value_only (bool, optional): Whether to return only latest values. Defaults to False. - - Returns: - pd.DataFrame: DataFrame containing market price data - - Examples: - >>> client = DataBridgesShapes("data_bridges_api_config.yaml") - >>> # Basic usage with dates - >>> df_prices = client.get_prices("KEN", start_date="2025-01-01", end_date="2025-12-31") - >>> # Using additional filters - >>> df_prices = client.get_prices( - ... "KEN", - ... start_date="2025-01-01", - ... market_id=123, - ... commodity_id=456, - ... price_flag="actual" - ... ) - """ - if start_date: - # Format the date according to RFC 3339 standard - start_date = date.fromisoformat(start_date).strftime( - "%Y-%m-%dT%H:%M:%S+01:00" - ) - else: - start_date = date.today().strftime("%Y-%m-%dT%H:%M:%S+01:00") - - if end_date: - # Format the date according to RFC 3339 standard - end_date = date.fromisoformat(end_date).strftime("%Y-%m-%dT%H:%M:%S+01:00") - else: - end_date = date.today().strftime("%Y-%m-%dT%H:%M:%S+01:00") - - responses = [] - total_items = 20 - max_item = 0 - page = 0 - while total_items > max_item: - page += 1 - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.MarketPricesApi(api_client) - env = self.env - - try: - api_prices = api_instance.market_prices_price_monthly_get( - country_code=country_iso3, - market_id=market_id, - commodity_id=commodity_id, - currency_id=currency_id, - price_flag=price_flag, - format="json", - page=page, - env=env, - start_date=start_date, - end_date=end_date, - latest_value_only=latest_value_only, - ) - responses.extend(item.to_dict() for item in api_prices.items) - total_items = api_prices.total_items - logger.info("Fetching page %s/n", page) - max_item = page * page_size - time.sleep(1) - except ApiException as e: - logger.error( - "Exception when calling Market price data->market_prices_price_monthly_get: %s\n", - e, - ) - raise - - df = pd.DataFrame(responses) - df = df.replace({np.nan: None}) - return df - - def get_exchange_rates( - self, country_iso3: str, page_size: int = 1000 - ) -> pd.DataFrame: - """Retrieves exchange rates for a given country from the Data Bridges API. - - Args: - country_iso3 (str): The ISO3 country code - page_size (int, optional): Number of items per page. Defaults to 1000 - - Returns: - pd.DataFrame: DataFrame containing exchange rate data with columns: - - date: Date of exchange rate - - rate: Exchange rate value - - currency: Currency code - And other relevant exchange rate information - - Examples: - >>> client = DataBridgesShapes("data_bridges_api_config.yaml") - >>> # Get exchange rates for Ethiopia - >>> rates_df = client.get_exchange_rates("ETH") - >>> # Check latest exchange rate - >>> latest_rate = rates_df.sort_values('date').iloc[-1] - - Raises: - ApiException: If there's an error calling the Exchange rates API - """ - - responses = [] - total_items = 20 - max_item = 0 - page = 0 - while total_items > max_item: - page += 1 - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.CurrencyApi(api_client) - env = self.env - - try: - api_exchange_rates = ( - api_instance.currency_usd_indirect_quotation_get( - country_iso3=country_iso3, format="json", page=page, env=env - ) - ) - responses.extend( - item.to_dict() for item in api_exchange_rates.items - ) - total_items = api_exchange_rates.total_items - logger.info("Fetching page %s", page) - max_item = page * page_size - time.sleep(1) - except ApiException as e: - logger.error( - "Exception when calling Exchange rates data-> : %s\n", - e, - ) - raise - df = pd.DataFrame(responses) - df = df.replace({np.nan: None}) - return df - - def get_commodities_list( - self, - country_iso3: Optional[str] = None, - commodity_name: Optional[str] = None, - commodity_id: Optional[int] = 0, - page: Optional[int] = 1, - format: Optional[str] = "json", - ) -> pd.DataFrame: - """ - Retrieves the detailed list of commodities available in the DataBridges platform. - - Args: - country_iso3 (str, optional): The code to identify the country. It can be an ISO-3166 Alpha 3 code or the VAM internal admin0code. - commodity_name (str, optional): The name, even partial and case insensitive, of a commodity. - commodity_id (int, optional): The exact ID of a commodity. Defaults to 0. - page (int, optional): Page number for paged results. Defaults to 1. - format (str, optional): Output format: 'json' or 'csv'. Defaults to 'json'. - - Examples: - >>> client = DataBridgesShapes("data_bridges_api_config.yaml") - >>> # Get full list of commmodities - >>> commodities_list = client.get_commodities_list() - >>> # Get commodities for Tanzania - >>> commodities_df = client.get_commodities_list(country_iso3="TZA") - >>> # Get commodity with name containing "Maize" - >>> maize_df = client.get_commodities_list(commodity_name="Maize") - >>> # Get commodity with specific ID - >>> specific_commodity_df = client.get_commodities_list(commodity_id=123) - - Returns: - pandas.DataFrame: A DataFrame containing the retrieved commodity data. - """ - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.CommoditiesApi(api_client) - env = self.env - - try: - api_response = api_instance.commodities_list_get( - country_code=country_iso3, - commodity_name=commodity_name, - commodity_id=commodity_id, - page=page, - format=format, - env=env, - ) - logger.info("Successfully retrieved commodities list") - - # Convert the response to a DataFrame - if hasattr(api_response, "items"): - df = pd.DataFrame([item.to_dict() for item in api_response.items]) - else: - df = pd.DataFrame([api_response.to_dict()]) - - df = df.replace({np.nan: None}) - return df - - except ApiException as e: - logger.error( - f"Exception when calling CommoditiesApi->commodities_list_get: {e}" - ) - raise - - def get_commodity_units_conversion_list( - self, - country_iso3: Optional[str] = None, - commodity_id: Optional[int] = 0, - from_unit_id: Optional[int] = 0, - to_unit_id: Optional[int] = 0, - page: Optional[int] = 1, - format: Optional[str] = "json", - ) -> pd.DataFrame: - """ - Retrieves conversion factors to Kilogram or Litres for each convertible unit of measure. - - Args: - country_iso3 (str, optional): The code to identify the country. It can be an ISO-3166 Alpha 3 code or the VAM internal admin0code. - commodity_id (int, optional): The exact ID of a Commodity, as found in /Commodities/List. Defaults to 0. - from_unit_id (int, optional): The exact ID of the original unit of measure of the price of a commodity. Defaults to 0. - to_unit_id (int, optional): The exact ID of the converted unit of measure of the price of a commodity. Defaults to 0. - page (int, optional): Page number for paged results. Defaults to 1. - format (str, optional): Output format: 'json' or 'csv'. Defaults to 'json'. - - Examples: - >>> client = DataBridgesShapes("data_bridges_api_config.yaml") - >>> # Get full list of commodity units conversions - >>> full_list = client.get_commodity_units_conversion_list() - >>> # Get conversion factors for Tanzania - >>> conversion_factors_df = client.get_commodity_units_conversion_list(country_iso3="TZA") - - Returns: - pandas.DataFrame: A DataFrame containing the retrieved conversion factors. - """ - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.CommodityUnitsApi(api_client) - env = self.env - - try: - api_response = api_instance.commodity_units_conversion_list_get( - country_code=country_iso3, - commodity_id=commodity_id, - from_unit_id=from_unit_id, - to_unit_id=to_unit_id, - page=page, - format=format, - env=env, - ) - logger.info("Successfully retrieved commodity units conversion list") - - df = pd.DataFrame([item.to_dict() for item in api_response.items]) - df = df.replace({np.nan: None}) - return df - - except ApiException as e: - logger.error( - f"Exception when calling CommodityUnitsApi->commodity_units_conversion_list_get: {e}" - ) - raise - - def get_commodity_units_list( - self, - country_iso3: Optional[str] = None, - commodity_unit_name: Optional[str] = None, - commodity_unit_id: Optional[int] = 0, - page: Optional[int] = 1, - format: Optional[str] = "json", - ) -> pd.DataFrame: - """ - Retrieves the detailed list of the unit of measure available in DataBridges platform. - - Args: - country_iso3 (str, optional): The code to identify the country. It can be an ISO-3166 Alpha 3 code or the VAM internal admin0code. - commodity_unit_name (str, optional): The name, even partial and case insensitive, of a commodity unit. - commodity_unit_id (int, optional): The exact ID of a commodity unit. Defaults to 0. - page (int, optional): Page number for paged results. Defaults to 1. - format (str, optional): Output format: 'json' or 'csv'. Defaults to 'json'. - - Examples: - >>> client = DataBridgesShapes("data_bridges_api_config.yaml") - >>> # Get commodity units for Tanzania - >>> units_df = client.get_commodity_units_list(country_iso3="TZA") - >>> # Get commodity unit with name containing "Kg" - >>> kg_unit_df = client.get_commodity_units_list(commodity_unit_name="Kg") - >>> # Get commodity unit with specific ID - >>> specific_unit_df = client.get_commodity_units_list(commodity_unit_id=5) - - Returns: - pandas.DataFrame: A DataFrame containing the retrieved commodity units data. - """ - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.CommodityUnitsApi(api_client) - env = self.env - - try: - api_response = api_instance.commodity_units_list_get( - country_code=country_iso3, - commodity_unit_name=commodity_unit_name, - commodity_unit_id=commodity_unit_id, - page=page, - format=format, - env=env, - ) - logger.info("Successfully retrieved commodity units list") - - df = pd.DataFrame([item.to_dict() for item in api_response.items]) - df = df.replace({np.nan: None}) - return df - - except ApiException as e: - logger.error( - f"Exception when calling CommodityUnitsApi->commodity_units_list_get: {e}" - ) - raise - - def get_currency_list( - self, - country_iso3: Optional[str] = None, - currency_name: Optional[str] = None, - currency_id: Optional[str] = 0, - page: Optional[int] = 1, - format: Optional[str] = "json", - ): - """ - Returns the list of currencies available in the internal VAM database, with Currency 3-letter code, matching with ISO 4217. - - Args: - country_iso3 (str, optional): The code to identify the country. It can be an ISO-3166 Alpha 3 code or the VAM internal admin0code. - currency_name (str, optional): Currency 3-letter code, matching with ISO 4217. - currency_id (int, optional): Unique code to identify the currency in internal VAM currencies. Defaults to 0. - page (int, optional): Page number for paged results. Defaults to 1. - format (str, optional): Output format: 'json' or 'csv'. Defaults to 'json'. - - Examples: - >>> client = DataBridgesShapes("data_bridges_api_config.yaml") - >>> # Get currencies for Tanzania - >>> currencies_df = client.get_currency_list(country_iso3="TZA") - >>> # Get currency with name "ETB" - >>> etb_df = client.get_currency_list(currency_name="ETB") - >>> # Get currency with specific ID - >>> specific_currency_df = client.get_currency_list(currency_id=1) - - Returns: - pandas.DataFrame: A DataFrame containing the retrieved currency data. - """ - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.CurrencyApi(api_client) - env = self.env - - try: - api_response = api_instance.currency_list_get( - country_code=country_iso3, - currency_name=currency_name, - currency_id=currency_id, - page=page, - format=format, - env=env, - ) - logger.info("Successfully retrieved currency list") - - df = pd.DataFrame([item.to_dict() for item in api_response.items]) - df = df.replace({np.nan: None}) - return df - - except ApiException as e: - logger.error( - f"Exception when calling CurrencyApi->currency_list_get: {e}" - ) - raise - - def get_usd_indirect_quotation( - self, - country_iso3: Optional[str] = "", - currency_name: Optional[str] = "", - page: Optional[int] = 1, - format: Optional[str] = "json", - ): - """ - Returns the value of the Exchange rates from Trading Economics, for official rates, and DataViz for unofficial rates. - - Args: - country_iso3 (str, optional): The code to identify the country. Must be a ISO-3166 Alpha 3 code. Defaults to ''. - currency_name (str, optional): The ISO3 code for the currency, based on ISO4217. Defaults to ''. - page (int, optional): Page number for paged results. Defaults to 1. - format (str, optional): Output format: 'json' or 'csv'. Defaults to 'json'. - - Examples: - >>> client = DataBridgesShapes("data_bridges_api_config.yaml") - >>> # Get USD indirect quotation for Ethiopia - >>> usd_df = client.get_usd_indirect_quotation(country_iso3="ETH") - >>> # Get USD indirect quotation for currency "ETB" - >>> etb_usd_df = client.get_usd_indirect_quotation(currency_name="ETB") - - Returns: - pandas.DataFrame: A DataFrame containing the retrieved exchange rate data. - """ - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.CurrencyApi(api_client) - env = self.env - - try: - api_response = api_instance.currency_usd_indirect_quotation_get( - country_iso3=country_iso3, - currency_name=currency_name, - page=page, - format=format, - env=env, - ) - logger.info("Successfully retrieved USD indirect quotation data") - - df = pd.DataFrame([item.to_dict() for item in api_response.items]) - df = df.replace({np.nan: None}) - return df - - except ApiException as e: - logger.error( - f"Exception when calling CurrencyApi->currency_usd_indirect_quotation_get: {e}" - ) - raise - - def get_economic_indicator_list( - self, - page: Optional[int] = 1, - indicator_name: Optional[str] = "", - country_iso3: Optional[str] = "", - format: Optional[str] = "json", - ): - """ - Returns the lists of indicators for which Vulnerability Analysis and Mapping - Economic and Market Analysis Unit has redistribution licensing from Trading Economics. - - Args: - page (int, optional): Page number for paged results. Defaults to 1. - indicator_name (str, optional): Unique indicator name. Defaults to ''. - iso3 (str, optional): The code to identify the country. Must be a ISO-3166 Alpha 3 code. Defaults to ''. - format (str, optional): Output format: 'json' or 'csv'. Defaults to 'json'. - - Returns: - pandas.DataFrame: A DataFrame containing the retrieved economic indicator data. - """ - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - # Create an instance of the API class - api_instance = data_bridges_client.EconomicDataApi(api_client) - - try: - # Returns the lists of indicators. - api_response = api_instance.economic_data_indicator_list_get( - page=page, - indicator_name=indicator_name, - iso3=country_iso3, - format=format, - env=self.env, - ) - logger.info( - "The response of EconomicDataApi->economic_data_indicator_list_get:\n" - ) - df = pd.DataFrame([item.to_dict() for item in api_response.items]) - df = df.replace({np.nan: None}) - return df - return api_response - except Exception as e: - logger.error( - "Exception when calling EconomicDataApi->economic_data_indicator_list_get: %s", - e, - ) - raise - - def get_market_geojson_list(self, country_iso3: str = None): - """Returns a list of geo-referenced markets in a specific country.""" - if country_iso3 is None: - raise ValueError("country_iso3 parameter is required") - else: - adm0code = get_adm0_code(country_iso3) - - # Enter a context with an instance of the API client - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - # Create an instance of the API class - api_instance = data_bridges_client.MarketsApi(api_client) - - try: - # Provide a list of geo referenced markets in a specific country - api_response = api_instance.markets_geo_json_list_get( - adm0code=adm0code, env=self.env - ) - logger.info("The response of MarketsApi->markets_geo_json_list_get:\n") - - geojson_dict = api_response.model_dump() - - return geojson_dict - return api_response - except Exception as e: - logger.error( - "Exception when calling MarketsApi->markets_geo_json_list_get: %s", - e, - ) - raise - - def get_markets_list( - self, country_iso3: Optional[str] = None, page: Optional[int] = 1 - ) -> pd.DataFrame: - """Retrieves a complete list of markets in a country. - - Args: - country_iso3 (str, optional): The ISO3 code to identify the country. Defaults to None. - page (int, optional): Page number for paginated results. Defaults to 1. - - Returns: - pd.DataFrame: DataFrame containing market information with columns: - - market_id: Unique identifier for the market - - market_name: Name of the market - - adm0_code: Country administrative code - - latitude: Market location latitude - - longitude: Market location longitude - And other market-related fields - - Examples: - >>> client = DataBridgesShapes("data_bridges_api_config.yaml") - >>> # Get markets for Afghanistan - >>> afg_markets = client.get_markets_list("AFG") - - Raises: - ApiException: If there's an error accessing the Markets API - """ - # Enter a context with an instance of the API client - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - # Create an instance of the API class - api_instance = data_bridges_client.MarketsApi(api_client) - format = "json" # str | Output format: [JSON|CSV] Json is the default value (optional) (default to 'json') - env = ( - self.env - ) # str | Environment. * `prod` - api.vam.wfp.org * `dev` - dev.api.vam.wfp.org (optional) - - try: - # Get a complete list of markets in a country - api_response = api_instance.markets_list_get( - country_code=country_iso3, page=page, format=format, env=env - ) - df = pd.DataFrame([item.to_dict() for item in api_response.items]) - df = df.replace({np.nan: None}) - return df - except Exception as e: - logger.error( - "Exception when calling MarketsApi->markets_list_get: %s", e - ) - raise - - def get_markets_as_csv( - self, country_iso3: Optional[str] = None, local_names: bool = False - ) -> str: - """Retrieves a complete list of markets in a country in CSV format. - - Args: - country_iso3 (str, optional): Country administrative code. Defaults to None. - local_names (bool, optional): If True, market and region names will be - localized if available. Defaults to False. - - Returns: - str: CSV formatted string containing market data - - Examples: - >>> client = DataBridgesShapes("data_bridges_api_config.yaml") - >>> # Get markets CSV for Afghanistan - >>> markets_csv = client.get_markets_as_csv("AFG") - >>> # Get localized market names - >>> local_markets = client.get_markets_as_csv("AFG", local_names=True) - - Raises: - ApiException: If there's an error accessing the Markets API - """ - - adm0code = get_adm0_code(country_iso3) - - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.MarketsApi(api_client) - local_names = False # bool | If true the name of markets and regions will be localized if available (optional) (default to False) - - try: - # Get a complete list of markets in a country - api_response = api_instance.markets_markets_as_csv_get( - adm0code=adm0code, local_names=local_names, env=self.env - ) - logger.info("The response of MarketsApi->markets_markets_as_csv_get:\n") - return api_response - except Exception as e: - logger.error( - "Exception when calling MarketsApi->markets_markets_as_csv_get: %s", - e, - ) - raise - - def get_nearby_markets( - self, country_iso3: str = None, lat: float = None, lng: float = None - ) -> pd.DataFrame: - """Finds markets near a given location within a 15km distance. - - Args: - country_iso3 (str): Country administrative code. Defaults to None. - lat (float): Latitude of the search point. Defaults to None. - lng (float): Longitude of the search point. Defaults to None. - - Returns: - pd.DataFrame: DataFrame containing nearby markets with columns: - - market_id: Unique identifier for the market - - market_name: Name of the market - - distance: Distance from search point in kilometers - - latitude: Market location latitude - - longitude: Market location longitude - And other market-related fields - - Examples: - >>> client = DataBridgesShapes("data_bridges_api_config.yaml") - >>> # Find markets near coordinates in Afghanistan - >>> nearby = client.get_nearby_markets("AFG", 34.515, 69.208) - >>> # Sort markets by distance - >>> closest = nearby.sort_values('distance').iloc[0] - - Raises: - ApiException: If there's an error accessing the Markets API - """ - - adm0code = get_adm0_code(country_iso3) - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.MarketsApi(api_client) - env = self.env - - try: - api_response = api_instance.markets_nearby_markets_get( - adm0code=adm0code, lat=lat, lng=lng, env=env - ) - logger.info("Successfully retrieved nearby markets") - df = pd.DataFrame([item.to_dict() for item in api_response]) - df = df.replace({np.nan: None}) - return df - except ApiException as e: - logger.error( - f"Exception when calling MarketsApi->markets_nearby_markets_get: {e}" - ) - raise - - def get_global_outlook( - self, - data_type: Literal["country_latest", "global_latest", "regional_latest"], - page: Optional[int] = None, - ) -> pd.DataFrame: - """Retrieves data from the Global Outlook API. - - The Global Outlook API provides access to WFP’s forward-looking analysis and - aggregated insights at different geographical levels, including country, - regional, and global summaries. - - Args: - data_type (str): The type of Global Outlook data to retrieve. Must be one of: - - 'country_latest': Latest data at country level - - 'global_latest': Latest global aggregated data - - 'regional_latest': Latest data aggregated by region - page (int, optional): Page number for paginated results. Currently not used - for latest endpoints. Defaults to None. - - Returns: - pd.DataFrame: DataFrame containing Global Outlook data for the selected scope. - - Examples: - >>> client = DataBridgesShapes("data_bridges_api_config.yaml") - >>> # Get latest country-level outlook - >>> country_data = client.get_global_outlook("country_latest") - >>> # Get global outlook summary - >>> global_data = client.get_global_outlook("global_latest") - >>> # Get regional outlook data - >>> regional_data = client.get_global_outlook("regional_latest") - - Raises: - ValueError: If data_type is not one of the allowed values - ApiException: If there is an error accessing the Global Outlook API - """ - - # Enter a context with an instance of the API client - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - # Create an instance of the API class - api_instance = data_bridges_client.GlobalOutlookApi(api_client) - env = ( - self.env - ) # str | Environment. * `prod` - api.vam.wfp.org * `dev` - dev.api.vam.wfp.org (optional) - - try: - if data_type == "country_latest": - api_response = api_instance.global_outlook_country_latest_get( - env=env - ) - elif data_type == "global_latest": - api_response = api_instance.global_outlook_global_latest_get( - env=env - ) - - elif data_type == "regional_latest": - api_response = api_instance.global_outlook_regional_latest_get( - env=env - ) - else: - raise ValueError(f"Invalid data_type: {data_type}") - logger.info( - f"Successfully retrieved Global Outlook data for type: {data_type}" - ) - return pd.DataFrame([item.to_dict() for item in api_response.items]) - - except Exception as e: - logger.error( - "Exception when calling GlobalOutlookApi->%s: %s", data_type, e - ) - raise - - def get_household_survey( - self, survey_id: int, access_type: str, page_size: Optional[int] = 600 - ) -> pd.DataFrame: - """Retrieves household survey data using the specified access type. - - Args: - survey_id (int]): The ID of the survey to retrieve - access_type (str): The type of access to use. Must be one of: - - 'draft': Draft internal base data (requires API key) - - 'full': Complete survey data (requires API key). Data is returned as inserted by the country office and it might contain PII and unstandardized fields. - - - 'official': Official use base data. Only data mapped against the standards is returned. - - 'public': Public base data - page_size (int, optional): Number of items per page. Defaults to 600 - - Returns: - pd.DataFrame: DataFrame containing survey data with columns specific to the - access type and survey structure - - Examples: - >>> client = DataBridgesShapes("data_bridges_api_config.yaml") - >>> # Get full, unmapped survey data - >>> full_data = client.get_household_survey(3094, "full") - >>> # Get standard data for official use (no PII) - >>> official_data = client.get_household_survey(3094, "official") - - Raises: - KeyError: If access_type is not one of the allowed values - ApiException: If there's an error accessing the API - """ - responses = [] - total_items = 1 - max_item = 0 - page = 0 - - while total_items > max_item: - page += 1 - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.IncubationApi(api_client) - env = self.env - - try: - logger.info(f"Calling get_household_survey for survey {survey_id}") - # Select appropriate API call based on access_type - api_call = { - "full": api_instance.household_full_data_get, - "draft": api_instance.household_draft_internal_base_data_get, - "official": api_instance.household_official_use_base_data_get, - "public": api_instance.household_public_base_data_get, - }.get(access_type) - - if access_type in ["full", "draft"]: - try: - api_survey = api_call( - self.data_bridges_api_key, - survey_id=survey_id, - page=page, - page_size=page_size, - env=env, - ) - except ApiException as e: - logger.error( - f"API key required when calling Household data-> '{access_type}': {e}" - ) - raise - else: - api_survey = api_call( - survey_id=survey_id, page=page, page_size=page_size, env=env - ) - - logger.info(f"Fetching page {page}") - logger.info(f"Items: {len(api_survey.items)}") - responses.extend(api_survey.items) - total_items = api_survey.total_items - max_item = len(api_survey.items) + max_item - time.sleep(1) - - except ApiException as e: - logger.error( - f"Exception when calling Household data-> {access_type}{e}" - ) - raise - - df = pd.DataFrame(responses) - return df - - def get_household_surveys_list( - self, - country_iso3: Optional[int] = None, - page: Optional[int] = 1, - start_date: Optional[str] = None, - end_date: Optional[str] = None, - survey_id: Optional[int] = None, - ) -> pd.DataFrame: - """Retrieves a list of household surveys for a country with their metadata. - - Args: - country_iso3 (str, optional): ISO3 Country code - page (int, optional): Page number for paginated results. Defaults to 1 - start_date (str, optional): Start date filter in ISO format (YYYY-MM-DD) - end_date (str, optional): End date filter in ISO format (YYYY-MM-DD) - survey_id (int, optional): Specific survey ID to retrieve - - Returns: - pd.DataFrame: DataFrame containing survey metadata with columns: - - survey_id: Unique identifier for the survey - - xls_form_id: ID of the questionnaire form used - - title: Survey title - - country: Country name - - start_date: Survey start date - - end_date: Survey end date - And other metadata fields - - Examples: - >>> client = DataBridgesShapes("data_bridges_api_config.yaml") - >>> # Get all surveys for a country - >>> surveys = client.get_household_surveys_list(country_iso3="COG") - >>> # Get surveys within date range - >>> surveys = client.get_household_surveys_list( - ... country_iso3="COG", - ... start_date="2024-01-01", - ... end_date="2024-12-31" - ... ) - - Raises: - ApiException: If there's an error accessing the API - """ - - adm0code = get_adm0_code(country_iso3) if country_iso3 else None - - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.IncubationApi(api_client) - env = self.env - - try: - api_response = api_instance.household_surveys_get( - adm0_code=adm0code, - page=page, - start_date=start_date, - end_date=end_date, - survey_id=survey_id, - env=env, - ) - logger.info("Successfully retrieved household surveys") - df = pd.DataFrame([item.to_dict() for item in api_response.items]) - df = df.replace({np.nan: None}) - return df - except ApiException as e: - logger.error( - f"Exception when calling IncubationApi->household_surveys_get: {e}" - ) - raise - - def get_household_xlsform_definition(self, xls_form_id: int) -> pd.DataFrame: - """Retrieves the complete XLS Form definition for a questionnaire. - - Args: - xls_form_id (int): The ID of the questionnaire form to retrieve - - Returns: - pd.DataFrame: DataFrame containing the form definition with columns: - - fields: List of field definitions - - choices: Available choices for select questions - - settings: Form settings - And other form structure information - - Examples: - >>> client = DataBridgesShapes("data_bridges_api_config.yaml") - >>> # Get form definition - >>> form_def = client.get_household_xlsform_definition(2067) - >>> # Access form fields - >>> fields = form_def['fields'].iloc[0] - - Raises: - ApiException: If there's an error accessing the API - """ - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.IncubationApi(api_client) - env = self.env - - try: - api_response = api_instance.xls_forms_definition_get( - xls_form_id=xls_form_id, env=env - ) - logger.info( - f"Successfully retrieved XLS Form definition for ID: {xls_form_id}" - ) - self.xlsform = pd.DataFrame([item.to_dict() for item in api_response]) - return self.xlsform - - except ApiException as e: - logger.error( - f"Exception when calling IncubationApi->xls_forms_definition_get: {e}" - ) - raise - - def get_household_questionnaire(self, xls_form_id: int) -> pd.DataFrame: - """Extracts the questionnaire structure from an XLS Form definition. - - Args: - xls_form_id (int): The ID of the questionnaire form to process - Returns: - pd.DataFrame: DataFrame containing the questionnaire structure with - one row per field in the form - - Examples: - >>> client = DataBridgesShapes("data_bridges_api_config.yaml") - >>> questionnaire = client.get_household_questionnaire(2075) - """ - if self.xlsform is None: - self.xlsform = self.get_household_xlsform_definition(xls_form_id) - return pd.DataFrame(list(self.xlsform.fields)[0]) +class DataBridgesShapes(DataBridgesKnots): + def __init__(self, *args, **kwargs): + warnings.warn( + ( + "\n[FUTURE WARNING]" + "DataBridgesShapes will be deprecated and be removed in v4.0.0 (July 2026).\n" + "Use 'DataBridgesKnots' instead.\n" + ), + FutureWarning, + stacklevel=2, + ) + super().__init__(*args, **kwargs) - def get_choice_list(self, xls_form_id: int) -> pd.DataFrame: - """Extracts choice lists from a questionnaire form definition. + def __repr__(self): + return f"DataBridgesShapes(host='{self.configuration.host}', env='{self.env}'), api_version='{self.api_version}'" - Args: - xls_form_id (int): The ID of the questionnaire form to process + def __str__(self): + return ( + f"DataBridgesShapes\n" + f" API Host: {self.configuration.host}\n" + f" Environment: {self.env}\n" + f"\n" + f"Brought to you with <3 by WFP VAM" + ) - Returns: - pd.DataFrame: DataFrame containing choice lists with columns: - - name: Name of the choice list - - value: Choice value/code - - label: Human-readable choice label - - Examples: - >>> client = DataBridgesShapes("data_bridges_api_config.yaml") - >>> choices = client.get_choice_list(123) - """ - questionnaire = self.get_household_questionnaire(xls_form_id) - choiceList = pd.json_normalize(questionnaire["choiceList"]).dropna() - choices = choiceList.explode("choices") - choices["value"] = choices["choices"].apply(lambda x: x["name"]) - choices["label"] = choices["choices"].apply(lambda x: x["label"]) - return choices[["name", "value", "label"]] +# Binding endpoints to the DataBridgesKnots class +# Household Endpoints (IncubationApi) +DataBridgesKnots.get_household_survey = get_household_survey +DataBridgesKnots.get_household_surveys_list = get_household_surveys_list +DataBridgesKnots.get_household_xlsform_definition = get_household_xlsform_definition +DataBridgesKnots.get_household_questionnaire = get_household_questionnaire +DataBridgesKnots.get_choice_list = get_choice_list - def get_mfi_surveys_full_data( - self, survey_id=None, page: Optional[int] = 1, page_size=20 - ): - """ - Get a full dataset that includes all the fields included in the survey in addition to the core Market Functionality Index (MFI) fields by Survey ID. - """ - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.SurveysApi(api_client) - env = self.env - try: - api_response = api_instance.m_fi_surveys_full_data_get( - survey_id=survey_id, - format="json", - page=page, - page_size=page_size, - env=env, - ) - logger.info("Successfully retrieved MFI surveys full data") - df = pd.DataFrame(api_response.items) - return df - except ApiException as e: - logger.error( - f"Exception when calling SurveysApi->m_fi_surveys_full_data_get: {e}" - ) - raise - - def get_mfi_surveys( - self, adm0_code=0, page: Optional[int] = 1, start_date=None, end_date=None - ): - """ - Retrieve Survey IDs, their corresponding XLS Form IDs, and Base XLS Form of all MFI surveys conducted in a country. - """ - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.SurveysApi(api_client) - env = self.env - - try: - api_response = api_instance.m_fi_surveys_get( - adm0_code=adm0_code, - page=page, - start_date=start_date, - end_date=end_date, - env=env, - ) - logger.info("Successfully retrieved MFI surveys list") - df = pd.DataFrame([item.to_dict() for item in api_response.items]) - df = df.replace({np.nan: None}) - return df - except ApiException as e: - logger.error( - f"Exception when calling SurveysApi->m_fi_surveys_get: {e}" - ) - raise - - def get_mfi_surveys_processed_data( - self, - survey_id=None, - page: Optional[int] = 1, - page_size=20, - format: Optional[str] = "json", - start_date=None, - end_date=None, - adm0_codes=None, - market_id=None, - survey_type=None, - ): - """ - Get MFI processed data in long format. - """ - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.SurveysApi(api_client) - env = self.env - - try: - api_response = api_instance.m_fi_surveys_processed_data_get( - survey_id=survey_id, - page=page, - page_size=page_size, - format=format, - start_date=start_date, - end_date=end_date, - adm0_codes=adm0_codes, - market_id=market_id, - survey_type=survey_type, - env=env, - ) - logger.info("Successfully retrieved MFI surveys processed data") - df = pd.DataFrame([item.to_dict() for item in api_response.items]) - df = df.replace({np.nan: None}) - return df - except ApiException as e: - logger.error( - f"Exception when calling SurveysApi->m_fi_surveys_processed_data_get: {e}" - ) - raise - - # TODO: Get the scope and test these functions - def get_rpme_base_data( - self, survey_id=None, page: Optional[int] = 1, page_size=20 - ): - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.RpmeApi(api_client) - env = self.env - - try: - api_response = api_instance.rpme_base_data_get( - survey_id=survey_id, page=page, page_size=page_size, env=env - ) - logger.info("Successfully retrieved RPME base data") - df = pd.DataFrame([item.to_dict() for item in api_response.items]) - df = df.replace({np.nan: None}) - return df - except ApiException as e: - logger.error( - f"Exception when calling RpmeApi->rpme_base_data_get: {e}" - ) - raise - - # TODO: Get the scope and test these functions - def get_rpme_full_data( - self, - survey_id=None, - format: Optional[str] = "json", - page: Optional[int] = 1, - page_size=20, - ): - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.RpmeApi(api_client) - env = self.env - - try: - api_response = api_instance.rpme_full_data_get( - survey_id=survey_id, - format=format, - page=page, - page_size=page_size, - env=env, - ) - logger.info("Successfully retrieved RPME full data") - df = pd.DataFrame([item.to_dict() for item in api_response.items]) - df = df.replace({np.nan: None}) - return df - except ApiException as e: - logger.error( - f"Exception when calling RpmeApi->rpme_full_data_get: {e}" - ) - raise - - # TODO: Get the scope and test these functions - def get_rpme_output_values( - self, - page: Optional[int] = 1, - adm0_code=None, - survey_id=None, - shop_id=None, - market_id=None, - adm0_code_dots="", - ): - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.RpmeApi(api_client) - env = self.env - - try: - api_response = api_instance.rpme_output_values_get( - page=page, - adm0_code=adm0_code, - survey_id=survey_id, - shop_id=shop_id, - market_id=market_id, - adm0_code_dots=adm0_code_dots, - env=env, - ) - logger.info("Successfully retrieved RPME output values") - df = pd.DataFrame([item.to_dict() for item in api_response.items]) - df = df.replace({np.nan: None}) - return df - except ApiException as e: - logger.error( - f"Exception when calling RpmeApi->rpme_output_values_get: {e}" - ) - raise - - # TODO: Get the scope and test these functions - def get_rpme_surveys( - self, adm0_code=0, page: Optional[int] = 1, start_date=None, end_date=None - ): - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.RpmeApi(api_client) - env = self.env - - try: - api_response = api_instance.rpme_surveys_get( - adm0_code=adm0_code, - page=page, - start_date=start_date, - end_date=end_date, - env=env, - ) - logger.info("Successfully retrieved RPME surveys") - df = pd.DataFrame([item.to_dict() for item in api_response.items]) - df = df.replace({np.nan: None}) - return df - except ApiException as e: - logger.error( - f"Exception when calling RpmeApi->rpme_surveys_get: {e}" - ) - raise - - # TODO: Get the scope and test these functions - def get_rpme_variables(self, page: Optional[int] = 1): - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.RpmeApi(api_client) - env = self.env - - try: - api_response = api_instance.rpme_variables_get(page=page, env=env) - logger.info("Successfully retrieved RPME variables") - df = pd.DataFrame([item.to_dict() for item in api_response.items]) - df = df.replace({np.nan: None}) - return df - except ApiException as e: - logger.error( - f"Exception when calling RpmeApi->rpme_variables_get: {e}" - ) - raise - - # TODO: Get the scope and test these functions - def get_rpme_xls_forms( - self, adm0_code=0, page: Optional[int] = 1, start_date=None, end_date=None - ): - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.RpmeApi(api_client) - env = self.env - - try: - api_response = api_instance.rpme_xls_forms_get( - adm0_code=adm0_code, - page=page, - start_date=start_date, - end_date=end_date, - env=env, - ) - logger.info("Successfully retrieved RPME XLS forms") - df = pd.DataFrame([item.to_dict() for item in api_response.items]) - df = df.replace({np.nan: None}) - return df - except ApiException as e: - logger.error( - f"Exception when calling RpmeApi->rpme_xls_forms_get: {e}" - ) - raise - - # Add this function to the DataBridgesShapes class - - def get_mfi_xls_forms( - self, adm0_code=0, page: Optional[int] = 1, start_date=None, end_date=None - ): - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.XlsFormsApi(api_client) - env = self.env - - try: - api_response = api_instance.m_fi_xls_forms_get( - adm0_code=adm0_code, - page=page, - start_date=start_date, - end_date=end_date, - env=env, - ) - logger.info("Successfully retrieved MFI XLS forms") - df = pd.DataFrame([item.to_dict() for item in api_response.items]) - df = df.replace({np.nan: None}) - return df - except ApiException as e: - logger.error( - f"Exception when calling XlsFormsApi->m_fi_xls_forms_get: {e}" - ) - raise - - def get_mfi_xls_forms_detailed( - self, adm0_code=0, page: Optional[int] = 1, start_date=None, end_date=None - ): - """ - Get a complete list of XLS Forms uploaded on the MFI Data Bridge in a given period of data collection. +# Currency Endpoints (CurrencyApi) +DataBridgesKnots.get_exchange_rates = get_exchange_rates +DataBridgesKnots.get_currency_list = get_currency_list +DataBridgesKnots.get_usd_indirect_quotation = get_usd_indirect_quotation - Args: - adm0_code (int): Code for the country. Defaults to 0. - page (int): Page number for paged results. Defaults to 1. - start_date (str): Starting date for data collection range (YYYY-MM-DD format) - end_date (str): Ending date for data collection range (YYYY-MM-DD format) - - Examples: - >>> client = DataBridgesShapes("data_bridges_api_config.yaml") - >>> # Get detailed XLS forms for country code 231 - >>> detailed_forms = client.get_mfi_xls_forms_detailed(adm0_code=231) - >>> # Get forms within a date range - >>> forms_in_range = client.get_mfi_xls_forms_detailed( - ... adm0_code=231, - ... start_date="2023-01-01", - ... end_date="2023-12-31" - ... ) +# Market Prices Endpoints (MarketPricesApi) +DataBridgesKnots.get_prices = get_prices - Returns: - pandas.DataFrame: DataFrame containing XLS Forms data - """ - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.XlsFormsApi(api_client) - env = self.env - - try: - api_response = api_instance.m_fi_xls_forms_get( - adm0_code=adm0_code, - page=page, - start_date=start_date, - end_date=end_date, - env=env, - ) - logger.info("Successfully retrieved detailed MFI XLS forms") - - # Convert response items to DataFrame - df = pd.DataFrame([item.to_dict() for item in api_response.items]) - df = df.replace({np.nan: None}) - - # Add total items count as DataFrame attribute - df.total_items = api_response.total_items - - return df - - except ApiException as e: - logger.error( - f"Exception when calling XlsFormsApi->m_fi_xls_forms_get: {e}" - ) - raise - - def get_mfi_surveys_base_data( - self, survey_id=None, page: Optional[int] = 1, page_size=20 - ): - """ - Get data that includes the core Market Functionality Index (MFI) fields only by Survey ID. +# Commodity Units Endpoints (CommodityUnitsApi) +DataBridgesKnots.get_commodities_list = get_commodities_list +DataBridgesKnots.get_commodity_units_conversion_list = ( + get_commodity_units_conversion_list +) +DataBridgesKnots.get_commodity_units_list = get_commodity_units_list +DataBridgesKnots.get_commodity_categories_list = get_commodity_categories_list - Args: - survey_id (int): Unique identifier for the collected data - page (int): Page number for paged results - page_size (int): Number of items per page +# Market Endpoints (MarketsApi) +DataBridgesKnots.get_market_geojson_list = get_market_geojson_list +DataBridgesKnots.get_markets_list = get_markets_list +DataBridgesKnots.get_markets_as_csv = get_markets_as_csv +DataBridgesKnots.get_nearby_markets = get_nearby_markets - Examples: - >>> client = DataBridgesShapes("data_bridges_api_config.yaml") - >>> # Get MFI base data for a specific survey - >>> base_data = client.get_mfi_surveys_base_data(survey_id=123) - Returns: - pandas.DataFrame: DataFrame containing MFI base survey data - """ - with data_bridges_client.ApiClient( - self._setup_configuration_and_authentication(self.config) - ) as api_client: - api_instance = data_bridges_client.SurveysApi(api_client) - env = self.env +# RPME Endpoints (RpmeApi) +DataBridgesKnots.get_rpme_base_data = get_rpme_base_data +DataBridgesKnots.get_rpme_full_data = get_rpme_full_data +DataBridgesKnots.get_rpme_output_values = get_rpme_output_values +DataBridgesKnots.get_rpme_surveys = get_rpme_surveys +DataBridgesKnots.get_rpme_variables = get_rpme_variables +DataBridgesKnots.get_rpme_xls_forms = get_rpme_xls_forms - try: - api_response = api_instance.m_fi_surveys_base_data_get( - survey_id=survey_id, page=page, page_size=page_size, env=env - ) - logger.info("Successfully retrieved MFI surveys base data") - return pd.DataFrame(api_response.items) +# Global Outlook +DataBridgesKnots.get_global_outlook = get_global_outlook - except ApiException as e: - logger.error( - f"Exception when calling SurveysApi->m_fi_surveys_base_data_get: {e}" - ) - raise +# Economic Data +DataBridgesKnots.get_economic_indicator_list = get_economic_indicator_list - def get_ipc_and_equivalent_data(self): - pass +# Ipc and CH data +DataBridgesKnots.get_ipc_and_equivalent_data = get_ipc_and_equivalent_data - def get_hotpost_data(self): - pass +# CARI data +DataBridgesKnots.get_cari_data = get_cari_data - def get_aims_data(self): - pass +# Hunger Hotspot data +DataBridgesKnots.get_hunger_hotspot_data = get_hunger_hotspot_data - def get_rpme_data(self): - pass - def get_cari_data(self): - pass +# MFI Endpoints (SurveysApi) +DataBridgesKnots.get_mfi_surveys_base_data = get_mfi_surveys_base_data +DataBridgesKnots.get_mfi_surveys_full_data = get_mfi_surveys_full_data +DataBridgesKnots.get_mfi_surveys = get_mfi_surveys +DataBridgesKnots.get_mfi_surveys_processed_data = get_mfi_surveys_processed_data +DataBridgesKnots.get_mfi_xls_forms = get_mfi_xls_forms +DataBridgesKnots.get_mfi_xls_forms_detailed = get_mfi_xls_forms_detailed if __name__ == "__main__": pass - # import yaml - - # # FOR TESTING - # CONFIG_PATH = r"data_bridges_api_config.yaml" - # client = DataBridgesShapes(CONFIG_PATH) diff --git a/data_bridges_knots/endpoints/__init__.py b/data_bridges_knots/endpoints/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/data_bridges_knots/endpoints/commodityApi.py b/data_bridges_knots/endpoints/commodityApi.py new file mode 100644 index 0000000..0536f32 --- /dev/null +++ b/data_bridges_knots/endpoints/commodityApi.py @@ -0,0 +1,242 @@ +from typing import Optional + +import logging + +import data_bridges_client +import numpy as np +import pandas as pd +from data_bridges_client.rest import ApiException + +logname = "data_bridges_api_calls.log" +logging.basicConfig( + filename=logname, + filemode="a", + format="%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + level=logging.INFO, +) + +logger = logging.getLogger(__name__) + + +def get_commodities_list( + self, + country_iso3: Optional[str] = None, + commodity_name: Optional[str] = None, + commodity_id: Optional[int] = 0, + page: Optional[int] = 1, + format: Optional[str] = "json", +) -> pd.DataFrame: + """ + Retrieves the detailed list of commodities available in the DataBridges platform. + + Args: + country_iso3 (str, optional): The code to identify the country. It can be an ISO-3166 Alpha 3 code or the VAM internal admin0code. + commodity_name (str, optional): The name, even partial and case insensitive, of a commodity. + commodity_id (int, optional): The exact ID of a commodity. Defaults to 0. + page (int, optional): Page number for paged results. Defaults to 1. + format (str, optional): Output format: 'json' or 'csv'. Defaults to 'json'. + + Examples: + >>> client = DataBridgesShapes("data_bridges_api_config.yaml") + >>> # Get full list of commmodities + >>> commodities_list = client.get_commodities_list() + >>> # Get commodities for Tanzania + >>> commodities_df = client.get_commodities_list(country_iso3="TZA") + >>> # Get commodity with name containing "Maize" + >>> maize_df = client.get_commodities_list(commodity_name="Maize") + >>> # Get commodity with specific ID + >>> specific_commodity_df = client.get_commodities_list(commodity_id=123) + + Returns: + pandas.DataFrame: A DataFrame containing the retrieved commodity data. + """ + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.CommoditiesApi(api_client) + env = self.env + + try: + api_response = api_instance.commodities_list_get( + country_code=country_iso3, + commodity_name=commodity_name, + commodity_id=commodity_id, + page=page, + format=format, + env=env, + ) + logger.info("Successfully retrieved commodities list") + + # Convert the response to a DataFrame + if hasattr(api_response, "items"): + df = pd.DataFrame([item.to_dict() for item in api_response.items]) + else: + df = pd.DataFrame([api_response.to_dict()]) + + df = df.replace({np.nan: None}) + return df + + except ApiException as e: + logger.error( + f"Exception when calling CommoditiesApi->commodities_list_get: {e}" + ) + raise + + +def get_commodity_units_conversion_list( + self, + country_iso3: Optional[str] = None, + commodity_id: Optional[int] = 0, + from_unit_id: Optional[int] = 0, + to_unit_id: Optional[int] = 0, + page: Optional[int] = 1, + format: Optional[str] = "json", +) -> pd.DataFrame: + """ + Retrieves conversion factors to Kilogram or Litres for each convertible unit of measure. + + Args: + country_iso3 (str, optional): The code to identify the country. It can be an ISO-3166 Alpha 3 code or the VAM internal admin0code. + commodity_id (int, optional): The exact ID of a Commodity, as found in /Commodities/List. Defaults to 0. + from_unit_id (int, optional): The exact ID of the original unit of measure of the price of a commodity. Defaults to 0. + to_unit_id (int, optional): The exact ID of the converted unit of measure of the price of a commodity. Defaults to 0. + page (int, optional): Page number for paged results. Defaults to 1. + format (str, optional): Output format: 'json' or 'csv'. Defaults to 'json'. + + Examples: + >>> client = DataBridgesShapes("data_bridges_api_config.yaml") + >>> # Get full list of commodity units conversions + >>> full_list = client.get_commodity_units_conversion_list() + >>> # Get conversion factors for Tanzania + >>> conversion_factors_df = client.get_commodity_units_conversion_list(country_iso3="TZA") + + Returns: + pandas.DataFrame: A DataFrame containing the retrieved conversion factors. + """ + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.CommodityUnitsApi(api_client) + env = self.env + + try: + api_response = api_instance.commodity_units_conversion_list_get( + country_code=country_iso3, + commodity_id=commodity_id, + from_unit_id=from_unit_id, + to_unit_id=to_unit_id, + page=page, + format=format, + env=env, + ) + logger.info("Successfully retrieved commodity units conversion list") + + df = pd.DataFrame([item.to_dict() for item in api_response.items]) + df = df.replace({np.nan: None}) + return df + + except ApiException as e: + logger.error( + f"Exception when calling CommodityUnitsApi->commodity_units_conversion_list_get: {e}" + ) + raise + + +def get_commodity_units_list( + self, + country_iso3: Optional[str] = None, + commodity_unit_name: Optional[str] = None, + commodity_unit_id: Optional[int] = 0, + page: Optional[int] = 1, + format: Optional[str] = "json", +) -> pd.DataFrame: + """ + Retrieves the detailed list of the unit of measure available in DataBridges platform. + + Args: + country_iso3 (str, optional): The code to identify the country. It can be an ISO-3166 Alpha 3 code or the VAM internal admin0code. + commodity_unit_name (str, optional): The name, even partial and case insensitive, of a commodity unit. + commodity_unit_id (int, optional): The exact ID of a commodity unit. Defaults to 0. + page (int, optional): Page number for paged results. Defaults to 1. + format (str, optional): Output format: 'json' or 'csv'. Defaults to 'json'. + + Examples: + >>> client = DataBridgesShapes("data_bridges_api_config.yaml") + >>> # Get commodity units for Tanzania + >>> units_df = client.get_commodity_units_list(country_iso3="TZA") + >>> # Get commodity unit with name containing "Kg" + >>> kg_unit_df = client.get_commodity_units_list(commodity_unit_name="Kg") + >>> # Get commodity unit with specific ID + >>> specific_unit_df = client.get_commodity_units_list(commodity_unit_id=5) + + Returns: + pandas.DataFrame: A DataFrame containing the retrieved commodity units data. + """ + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.CommodityUnitsApi(api_client) + env = self.env + + try: + api_response = api_instance.commodity_units_list_get( + country_code=country_iso3, + commodity_unit_name=commodity_unit_name, + commodity_unit_id=commodity_unit_id, + page=page, + format=format, + env=env, + ) + logger.info("Successfully retrieved commodity units list") + + df = pd.DataFrame([item.to_dict() for item in api_response.items]) + df = df.replace({np.nan: None}) + return df + + except ApiException as e: + logger.error( + f"Exception when calling CommodityUnitsApi->commodity_units_list_get: {e}" + ) + raise + + +def get_commodity_categories_list( + self, + category_id: Optional[int] = 0, + country_iso3: Optional[str] = None, + category_name: Optional[str] = None, + page: Optional[int] = 1, + format: Optional[str] = "json", +) -> pd.DataFrame: + # Enter a context with an instance of the API client + # Enter a context with an instance of the API client + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + # Create an instance of the API class + api_instance = data_bridges_client.CommoditiesApi(api_client) + env = self.env + + try: + # Provides the list of categories. + api_response = api_instance.commodities_categories_list_get( + country_code=country_iso3, + category_name=category_name, + category_id=category_id, + page=page, + format=format, + env=env, + ) + + logger.info( + "The response of CommoditiesApi->commodities_categories_list_get:\n" + ) + df = pd.DataFrame([item.to_dict() for item in api_response.items]) + df = df.replace({np.nan: None}) + return df + except Exception as e: + logger.error( + "Exception when calling CommoditiesApi->commodities_categories_list_get: %s\n" + % e + ) diff --git a/data_bridges_knots/endpoints/currencyApi.py b/data_bridges_knots/endpoints/currencyApi.py new file mode 100644 index 0000000..6c062cc --- /dev/null +++ b/data_bridges_knots/endpoints/currencyApi.py @@ -0,0 +1,186 @@ +from typing import Optional + +import logging +import time + +import data_bridges_client +import numpy as np +import pandas as pd +from data_bridges_client.rest import ApiException + +logname = "data_bridges_api_calls.log" +logging.basicConfig( + filename=logname, + filemode="a", + format="%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + level=logging.INFO, +) + +logger = logging.getLogger(__name__) + + +def get_exchange_rates(self, country_iso3: str, page_size: int = 1000) -> pd.DataFrame: + """Retrieves exchange rates for a given country from the Data Bridges API. + + Args: + country_iso3 (str): The ISO3 country code + page_size (int, optional): Number of items per page. Defaults to 1000 + + Returns: + pd.DataFrame: DataFrame containing exchange rate data with columns: + - date: Date of exchange rate + - rate: Exchange rate value + - currency: Currency code + And other relevant exchange rate information + + Examples: + >>> client = DataBridgesShapes("data_bridges_api_config.yaml") + >>> # Get exchange rates for Ethiopia + >>> rates_df = client.get_exchange_rates("ETH") + >>> # Check latest exchange rate + >>> latest_rate = rates_df.sort_values('date').iloc[-1] + + Raises: + ApiException: If there's an error calling the Exchange rates API + """ + + responses = [] + total_items = 20 + max_item = 0 + page = 0 + while total_items > max_item: + page += 1 + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.CurrencyApi(api_client) + env = self.env + + try: + api_exchange_rates = api_instance.currency_usd_indirect_quotation_get( + country_iso3=country_iso3, format="json", page=page, env=env + ) + responses.extend(item.to_dict() for item in api_exchange_rates.items) + total_items = api_exchange_rates.total_items + logger.info("Fetching page %s", page) + max_item = page * page_size + time.sleep(1) + except ApiException as e: + logger.error( + "Exception when calling Exchange rates data-> : %s\n", + e, + ) + raise + df = pd.DataFrame(responses) + df = df.replace({np.nan: None}) + return df + + +def get_currency_list( + self, + country_iso3: Optional[str] = None, + currency_name: Optional[str] = None, + currency_id: Optional[str] = 0, + page: Optional[int] = 1, + format: Optional[str] = "json", +): + """ + Returns the list of currencies available in the internal VAM database, with Currency 3-letter code, matching with ISO 4217. + + Args: + country_iso3 (str, optional): The code to identify the country. It can be an ISO-3166 Alpha 3 code or the VAM internal admin0code. + currency_name (str, optional): Currency 3-letter code, matching with ISO 4217. + currency_id (int, optional): Unique code to identify the currency in internal VAM currencies. Defaults to 0. + page (int, optional): Page number for paged results. Defaults to 1. + format (str, optional): Output format: 'json' or 'csv'. Defaults to 'json'. + + Examples: + >>> client = DataBridgesShapes("data_bridges_api_config.yaml") + >>> # Get currencies for Tanzania + >>> currencies_df = client.get_currency_list(country_iso3="TZA") + >>> # Get currency with name "ETB" + >>> etb_df = client.get_currency_list(currency_name="ETB") + >>> # Get currency with specific ID + >>> specific_currency_df = client.get_currency_list(currency_id=1) + + Returns: + pandas.DataFrame: A DataFrame containing the retrieved currency data. + """ + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.CurrencyApi(api_client) + env = self.env + + try: + api_response = api_instance.currency_list_get( + country_code=country_iso3, + currency_name=currency_name, + currency_id=currency_id, + page=page, + format=format, + env=env, + ) + logger.info("Successfully retrieved currency list") + + df = pd.DataFrame([item.to_dict() for item in api_response.items]) + df = df.replace({np.nan: None}) + return df + + except ApiException as e: + logger.error(f"Exception when calling CurrencyApi->currency_list_get: {e}") + raise + + +def get_usd_indirect_quotation( + self, + country_iso3: Optional[str] = "", + currency_name: Optional[str] = "", + page: Optional[int] = 1, + format: Optional[str] = "json", +): + """ + Returns the value of the Exchange rates from Trading Economics, for official rates, and DataViz for unofficial rates. + + Args: + country_iso3 (str, optional): The code to identify the country. Must be a ISO-3166 Alpha 3 code. Defaults to ''. + currency_name (str, optional): The ISO3 code for the currency, based on ISO4217. Defaults to ''. + page (int, optional): Page number for paged results. Defaults to 1. + format (str, optional): Output format: 'json' or 'csv'. Defaults to 'json'. + + Examples: + >>> client = DataBridgesShapes("data_bridges_api_config.yaml") + >>> # Get USD indirect quotation for Ethiopia + >>> usd_df = client.get_usd_indirect_quotation(country_iso3="ETH") + >>> # Get USD indirect quotation for currency "ETB" + >>> etb_usd_df = client.get_usd_indirect_quotation(currency_name="ETB") + + Returns: + pandas.DataFrame: A DataFrame containing the retrieved exchange rate data. + """ + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.CurrencyApi(api_client) + env = self.env + + try: + api_response = api_instance.currency_usd_indirect_quotation_get( + country_iso3=country_iso3, + currency_name=currency_name, + page=page, + format=format, + env=env, + ) + logger.info("Successfully retrieved USD indirect quotation data") + + df = pd.DataFrame([item.to_dict() for item in api_response.items]) + df = df.replace({np.nan: None}) + return df + + except ApiException as e: + logger.error( + f"Exception when calling CurrencyApi->currency_usd_indirect_quotation_get: {e}" + ) + raise diff --git a/data_bridges_knots/endpoints/economicDataApi.py b/data_bridges_knots/endpoints/economicDataApi.py new file mode 100644 index 0000000..a0b0687 --- /dev/null +++ b/data_bridges_knots/endpoints/economicDataApi.py @@ -0,0 +1,67 @@ +from typing import Optional + +import logging + +import data_bridges_client +import numpy as np +import pandas as pd + +logname = "data_bridges_api_calls.log" +logging.basicConfig( + filename=logname, + filemode="a", + format="%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + level=logging.INFO, +) + +logger = logging.getLogger(__name__) + + +def get_economic_indicator_list( + self, + page: Optional[int] = 1, + indicator_name: Optional[str] = "", + country_iso3: Optional[str] = "", + format: Optional[str] = "json", +): + """ + Returns the lists of indicators for which Vulnerability Analysis and Mapping - Economic and Market Analysis Unit has redistribution licensing from Trading Economics. + + Args: + page (int, optional): Page number for paged results. Defaults to 1. + indicator_name (str, optional): Unique indicator name. Defaults to ''. + iso3 (str, optional): The code to identify the country. Must be a ISO-3166 Alpha 3 code. Defaults to ''. + format (str, optional): Output format: 'json' or 'csv'. Defaults to 'json'. + + Returns: + pandas.DataFrame: A DataFrame containing the retrieved economic indicator data. + """ + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + # Create an instance of the API class + api_instance = data_bridges_client.EconomicDataApi(api_client) + + try: + # Returns the lists of indicators. + api_response = api_instance.economic_data_indicator_list_get( + page=page, + indicator_name=indicator_name, + iso3=country_iso3, + format=format, + env=self.env, + ) + logger.info( + "The response of EconomicDataApi->economic_data_indicator_list_get:\n" + ) + df = pd.DataFrame([item.to_dict() for item in api_response.items]) + df = df.replace({np.nan: None}) + return df + return api_response + except Exception as e: + logger.error( + "Exception when calling EconomicDataApi->economic_data_indicator_list_get: %s", + e, + ) + raise diff --git a/data_bridges_knots/endpoints/globalOutlookApi.py b/data_bridges_knots/endpoints/globalOutlookApi.py new file mode 100644 index 0000000..6db3cdc --- /dev/null +++ b/data_bridges_knots/endpoints/globalOutlookApi.py @@ -0,0 +1,91 @@ +from typing import Literal, Optional + +import logging + +import data_bridges_client +import pandas as pd + +logname = "data_bridges_api_calls.log" +logging.basicConfig( + filename=logname, + filemode="a", + format="%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + level=logging.INFO, +) + +logger = logging.getLogger(__name__) + + +# GlobalOutlookApi global_outlook_country_latest_get GET /GlobalOutlook/CountryLatest Return the latest country dataset of number of acutely food insecure (in thousands) based on WFP's Global Outlook. +# GlobalOutlookApi global_outlook_global_latest_get GET /GlobalOutlook/GlobalLatest Return the latest global dataset of number of acutely food insecure (in millions) based on WFP's Global Outlook. +# GlobalOutlookApi global_outlook_regional_latest_get GET /GlobalOutlook/RegionalLatest Return the latest regional dataset of number of acutely food insecure (in millions) based on WFP's Global Outlook. + + +# FIXME: Get scopes to test this function +def get_global_outlook( + self, + data_type: Literal["country_latest", "global_latest", "regional_latest"], + page: Optional[int] = None, +) -> pd.DataFrame: + """Retrieves data from the Global Outlook API. + + The Global Outlook API provides access to WFP’s forward-looking analysis and + aggregated insights at different geographical levels, including country, + regional, and global summaries. + + Args: + data_type (str): The type of Global Outlook data to retrieve. Must be one of: + - 'country_latest': Latest data at country level + - 'global_latest': Latest global aggregated data + - 'regional_latest': Latest data aggregated by region + page (int, optional): Page number for paginated results. Currently not used + for latest endpoints. Defaults to None. + + Returns: + pd.DataFrame: DataFrame containing Global Outlook data for the selected scope. + + Examples: + >>> client = DataBridgesShapes("data_bridges_api_config.yaml") + >>> # Get latest country-level outlook + >>> country_data = client.get_global_outlook("country_latest") + >>> # Get global outlook summary + >>> global_data = client.get_global_outlook("global_latest") + >>> # Get regional outlook data + >>> regional_data = client.get_global_outlook("regional_latest") + + Raises: + ValueError: If data_type is not one of the allowed values + ApiException: If there is an error accessing the Global Outlook API + """ + + # Enter a context with an instance of the API client + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + # Create an instance of the API class + api_instance = data_bridges_client.GlobalOutlookApi(api_client) + env = ( + self.env + ) # str | Environment. * `prod` - api.vam.wfp.org * `dev` - dev.api.vam.wfp.org (optional) + + try: + if data_type == "country_latest": + api_response = api_instance.global_outlook_country_latest_get(env=env) + elif data_type == "global_latest": + api_response = api_instance.global_outlook_global_latest_get(env=env) + + elif data_type == "regional_latest": + api_response = api_instance.global_outlook_regional_latest_get(env=env) + else: + raise ValueError(f"Invalid data_type: {data_type}") + logger.info( + f"Successfully retrieved Global Outlook data for type: {data_type}" + ) + return pd.DataFrame([item.to_dict() for item in api_response.items]) + + except Exception as e: + logger.error( + "Exception when calling GlobalOutlookApi->%s: %s", data_type, e + ) + raise diff --git a/data_bridges_knots/endpoints/householdApi.py b/data_bridges_knots/endpoints/householdApi.py new file mode 100644 index 0000000..18fc280 --- /dev/null +++ b/data_bridges_knots/endpoints/householdApi.py @@ -0,0 +1,271 @@ +from typing import Optional + +import logging +import time + +import data_bridges_client +import numpy as np +import pandas as pd +from data_bridges_client.rest import ApiException + +from data_bridges_knots.helpers import get_adm0_code + +logname = "data_bridges_api_calls.log" +logging.basicConfig( + filename=logname, + filemode="a", + format="%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + level=logging.INFO, +) + +logger = logging.getLogger(__name__) + + +def get_household_survey( + self, survey_id: int, access_type: str, page_size: Optional[int] = 600 +) -> pd.DataFrame: + """Retrieves household survey data using the specified access type. + + Args: + survey_id (int]): The ID of the survey to retrieve + access_type (str): The type of access to use. Must be one of: + - 'draft': Draft internal base data (requires API key) + - 'full': Complete survey data (requires API key). Data is returned as inserted by the country office and it might contain PII and unstandardized fields. + + - 'official': Official use base data. Only data mapped against the standards is returned. + - 'public': Public base data + page_size (int, optional): Number of items per page. Defaults to 600 + + Returns: + pd.DataFrame: DataFrame containing survey data with columns specific to the + access type and survey structure + + Examples: + >>> client = DataBridgesShapes("data_bridges_api_config.yaml") + >>> # Get full, unmapped survey data + >>> full_data = client.get_household_survey(3094, "full") + >>> # Get standard data for official use (no PII) + >>> official_data = client.get_household_survey(3094, "official") + + Raises: + KeyError: If access_type is not one of the allowed values + ApiException: If there's an error accessing the API + """ + responses = [] + total_items = 1 + max_item = 0 + page = 0 + + while total_items > max_item: + page += 1 + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.IncubationApi(api_client) + env = self.env + + try: + logger.info(f"Calling get_household_survey for survey {survey_id}") + # Select appropriate API call based on access_type + api_call = { + "full": api_instance.household_full_data_get, + "draft": api_instance.household_draft_internal_base_data_get, + "official": api_instance.household_official_use_base_data_get, + "public": api_instance.household_public_base_data_get, + }.get(access_type) + + if access_type in ["full", "draft"]: + try: + api_survey = api_call( + self.data_bridges_api_key, + survey_id=survey_id, + page=page, + page_size=page_size, + env=env, + ) + except ApiException as e: + logger.error( + f"API key required when calling Household data-> '{access_type}': {e}" + ) + raise + else: + api_survey = api_call( + survey_id=survey_id, page=page, page_size=page_size, env=env + ) + + logger.info(f"Fetching page {page}") + logger.info(f"Items: {len(api_survey.items)}") + responses.extend(api_survey.items) + total_items = api_survey.total_items + max_item = len(api_survey.items) + max_item + time.sleep(1) + + except ApiException as e: + logger.error( + f"Exception when calling Household data-> {access_type}{e}" + ) + raise + + df = pd.DataFrame(responses) + return df + + +def get_household_surveys_list( + self, + country_iso3: Optional[int] = None, + page: Optional[int] = 1, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + survey_id: Optional[int] = None, +) -> pd.DataFrame: + """Retrieves a list of household surveys for a country with their metadata. + + Args: + country_iso3 (str, optional): ISO3 Country code + page (int, optional): Page number for paginated results. Defaults to 1 + start_date (str, optional): Start date filter in ISO format (YYYY-MM-DD) + end_date (str, optional): End date filter in ISO format (YYYY-MM-DD) + survey_id (int, optional): Specific survey ID to retrieve + + Returns: + pd.DataFrame: DataFrame containing survey metadata with columns: + - survey_id: Unique identifier for the survey + - xls_form_id: ID of the questionnaire form used + - title: Survey title + - country: Country name + - start_date: Survey start date + - end_date: Survey end date + And other metadata fields + + Examples: + >>> client = DataBridgesShapes("data_bridges_api_config.yaml") + >>> # Get all surveys for a country + >>> surveys = client.get_household_surveys_list(country_iso3="COG") + >>> # Get surveys within date range + >>> surveys = client.get_household_surveys_list( + ... country_iso3="COG", + ... start_date="2024-01-01", + ... end_date="2024-12-31" + ... ) + + Raises: + ApiException: If there's an error accessing the API + """ + + adm0code = get_adm0_code(country_iso3) if country_iso3 else None + + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.IncubationApi(api_client) + env = self.env + + try: + api_response = api_instance.household_surveys_get( + adm0_code=adm0code, + page=page, + start_date=start_date, + end_date=end_date, + survey_id=survey_id, + env=env, + ) + logger.info("Successfully retrieved household surveys") + df = pd.DataFrame([item.to_dict() for item in api_response.items]) + df = df.replace({np.nan: None}) + return df + except ApiException as e: + logger.error( + f"Exception when calling IncubationApi->household_surveys_get: {e}" + ) + raise + + +def get_household_xlsform_definition(self, xls_form_id: int) -> pd.DataFrame: + """Retrieves the complete XLS Form definition for a questionnaire. + + Args: + xls_form_id (int): The ID of the questionnaire form to retrieve + + Returns: + pd.DataFrame: DataFrame containing the form definition with columns: + - fields: List of field definitions + - choices: Available choices for select questions + - settings: Form settings + And other form structure information + + Examples: + >>> client = DataBridgesShapes("data_bridges_api_config.yaml") + >>> # Get form definition + >>> form_def = client.get_household_xlsform_definition(2067) + >>> # Access form fields + >>> fields = form_def['fields'].iloc[0] + + Raises: + ApiException: If there's an error accessing the API + """ + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.IncubationApi(api_client) + env = self.env + + try: + api_response = api_instance.xls_forms_definition_get( + xls_form_id=xls_form_id, env=env + ) + logger.info( + f"Successfully retrieved XLS Form definition for ID: {xls_form_id}" + ) + self.xlsform = pd.DataFrame([item.to_dict() for item in api_response]) + return self.xlsform + + except ApiException as e: + logger.error( + f"Exception when calling IncubationApi->xls_forms_definition_get: {e}" + ) + raise + + +def get_household_questionnaire(self, xls_form_id: int) -> pd.DataFrame: + """Extracts the questionnaire structure from an XLS Form definition. + + Args: + xls_form_id (int): The ID of the questionnaire form to process + + Returns: + pd.DataFrame: DataFrame containing the questionnaire structure with + one row per field in the form + + Examples: + >>> client = DataBridgesShapes("data_bridges_api_config.yaml") + >>> questionnaire = client.get_household_questionnaire(2075) + """ + if self.xlsform is None: + self.xlsform = self.get_household_xlsform_definition(xls_form_id) + return pd.DataFrame(list(self.xlsform.fields)[0]) + + +def get_choice_list(self, xls_form_id: int) -> pd.DataFrame: + """Extracts choice lists from a questionnaire form definition. + + Args: + xls_form_id (int): The ID of the questionnaire form to process + + Returns: + pd.DataFrame: DataFrame containing choice lists with columns: + - name: Name of the choice list + - value: Choice value/code + - label: Human-readable choice label + + Examples: + >>> client = DataBridgesShapes("data_bridges_api_config.yaml") + >>> choices = client.get_choice_list(123) + """ + questionnaire = self.get_household_questionnaire(xls_form_id) + + choiceList = pd.json_normalize(questionnaire["choiceList"]).dropna() + choices = choiceList.explode("choices") + choices["value"] = choices["choices"].apply(lambda x: x["name"]) + choices["label"] = choices["choices"].apply(lambda x: x["label"]) + return choices[["name", "value", "label"]] diff --git a/data_bridges_knots/endpoints/hungerHotpotApi.py b/data_bridges_knots/endpoints/hungerHotpotApi.py new file mode 100644 index 0000000..1a0e14e --- /dev/null +++ b/data_bridges_knots/endpoints/hungerHotpotApi.py @@ -0,0 +1,3 @@ +# TODO: HungerHotspotApi +def get_hunger_hotspot_data(self): + pass diff --git a/data_bridges_knots/endpoints/incubationApi.py b/data_bridges_knots/endpoints/incubationApi.py new file mode 100644 index 0000000..820a0ea --- /dev/null +++ b/data_bridges_knots/endpoints/incubationApi.py @@ -0,0 +1,9 @@ +# TODO: IncubationApi +# FIXME: Get scopes to test this function +def get_cari_data(self, admin_level="admin0"): + if admin_level == "admin0": + pass + elif admin_level == "admin1": + pass + else: + raise ValueError("admin_level must be either 'admin0' or 'admin1'") diff --git a/data_bridges_knots/endpoints/ipcChApi.py b/data_bridges_knots/endpoints/ipcChApi.py new file mode 100644 index 0000000..e442f2b --- /dev/null +++ b/data_bridges_knots/endpoints/ipcChApi.py @@ -0,0 +1,3 @@ +# TODO: IpcchApi +def get_ipc_and_equivalent_data(self): + pass diff --git a/data_bridges_knots/endpoints/marketPricesApi.py b/data_bridges_knots/endpoints/marketPricesApi.py new file mode 100644 index 0000000..a95f11b --- /dev/null +++ b/data_bridges_knots/endpoints/marketPricesApi.py @@ -0,0 +1,119 @@ +from typing import Optional + +import logging +import time +from datetime import date + +import data_bridges_client +import numpy as np +import pandas as pd +from data_bridges_client.rest import ApiException + +logname = "data_bridges_api_calls.log" +logging.basicConfig( + filename=logname, + filemode="a", + format="%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + level=logging.INFO, +) + +logger = logging.getLogger(__name__) + + +def get_prices( + self, + country_iso3: str, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + page_size: int = 1000, + market_id: int = 0, + commodity_id: int = 0, + currency_id: int = 0, + price_flag: str = "", + latest_value_only: bool = False, +) -> pd.DataFrame: + """Fetches market price data for a given country within a specified date range. + + Args: + country_iso3 (str): The ISO 3-letter country code + start_date (str, optional): Start date in ISO format (e.g., '2022-01-01'). + If None, defaults to today's date. + end_date (str, optional): End date in ISO format (e.g., '2022-01-01'). + If None, defaults to today's date. + page_size (int, optional): Number of items per page. Defaults to 1000. + market_id (int, optional): Unique ID of a Market. Defaults to 0. + commodity_id (int, optional): The exact ID of a Commodity. Defaults to 0. + currency_id (int, optional): The exact ID of a currency. Defaults to 0. + price_flag (str, optional): Type of price data: [actual|aggregate|estimated|forecasted]. Defaults to ''. + latest_value_only (bool, optional): Whether to return only latest values. Defaults to False. + + Returns: + pd.DataFrame: DataFrame containing market price data + + Examples: + >>> client = DataBridgesShapes("data_bridges_api_config.yaml") + >>> # Basic usage with dates + >>> df_prices = client.get_prices("KEN", start_date="2025-01-01", end_date="2025-12-31") + >>> # Using additional filters + >>> df_prices = client.get_prices( + ... "KEN", + ... start_date="2025-01-01", + ... market_id=123, + ... commodity_id=456, + ... price_flag="actual" + ... ) + """ + if start_date: + # Format the date according to RFC 3339 standard + start_date = date.fromisoformat(start_date).strftime("%Y-%m-%dT%H:%M:%S+01:00") + else: + start_date = date.today().strftime("%Y-%m-%dT%H:%M:%S+01:00") + + if end_date: + # Format the date according to RFC 3339 standard + end_date = date.fromisoformat(end_date).strftime("%Y-%m-%dT%H:%M:%S+01:00") + else: + end_date = date.today().strftime("%Y-%m-%dT%H:%M:%S+01:00") + + responses = [] + total_items = 20 + max_item = 0 + page = 0 + while total_items > max_item: + page += 1 + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.MarketPricesApi(api_client) + env = self.env + + try: + api_prices = api_instance.market_prices_price_monthly_get( + country_code=country_iso3, + market_id=market_id, + commodity_id=commodity_id, + currency_id=currency_id, + price_flag=price_flag, + format="json", + page=page, + env=env, + start_date=start_date, + end_date=end_date, + latest_value_only=latest_value_only, + ) + responses.extend(item.to_dict() for item in api_prices.items) + total_items = api_prices.total_items + logger.info("Fetching page %s/n", page) + max_item = page * page_size + time.sleep(1) + except ApiException as e: + logger.error( + "Exception when calling Market price data->market_prices_price_monthly_get: %s\n", + e, + ) + raise + + df = pd.DataFrame(responses) + df = df.replace({np.nan: None}) + return df diff --git a/data_bridges_knots/endpoints/marketsApi.py b/data_bridges_knots/endpoints/marketsApi.py new file mode 100644 index 0000000..b6af819 --- /dev/null +++ b/data_bridges_knots/endpoints/marketsApi.py @@ -0,0 +1,203 @@ +from typing import Optional + +import logging + +import data_bridges_client +import numpy as np +import pandas as pd +from data_bridges_client.rest import ApiException + +from data_bridges_knots.helpers import get_adm0_code + +logname = "data_bridges_api_calls.log" +logging.basicConfig( + filename=logname, + filemode="a", + format="%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + level=logging.INFO, +) + +logger = logging.getLogger(__name__) + + +def get_market_geojson_list(self, country_iso3: str = None): + """Returns a list of geo-referenced markets in a specific country.""" + if country_iso3 is None: + raise ValueError("country_iso3 parameter is required") + else: + adm0code = get_adm0_code(country_iso3) + + # Enter a context with an instance of the API client + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + # Create an instance of the API class + api_instance = data_bridges_client.MarketsApi(api_client) + + try: + # Provide a list of geo referenced markets in a specific country + api_response = api_instance.markets_geo_json_list_get( + adm0code=adm0code, env=self.env + ) + logger.info("The response of MarketsApi->markets_geo_json_list_get:\n") + + geojson_dict = api_response.model_dump() + + return geojson_dict + return api_response + except Exception as e: + logger.error( + "Exception when calling MarketsApi->markets_geo_json_list_get: %s", + e, + ) + raise + + +def get_markets_list( + self, country_iso3: Optional[str] = None, page: Optional[int] = 1 +) -> pd.DataFrame: + """Retrieves a complete list of markets in a country. + + Args: + country_iso3 (str, optional): The ISO3 code to identify the country. Defaults to None. + page (int, optional): Page number for paginated results. Defaults to 1. + + Returns: + pd.DataFrame: DataFrame containing market information with columns: + - market_id: Unique identifier for the market + - market_name: Name of the market + - adm0_code: Country administrative code + - latitude: Market location latitude + - longitude: Market location longitude + And other market-related fields + + Examples: + >>> client = DataBridgesShapes("data_bridges_api_config.yaml") + >>> # Get markets for Afghanistan + >>> afg_markets = client.get_markets_list("AFG") + + Raises: + ApiException: If there's an error accessing the Markets API + """ + # Enter a context with an instance of the API client + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + # Create an instance of the API class + api_instance = data_bridges_client.MarketsApi(api_client) + format = "json" # str | Output format: [JSON|CSV] Json is the default value (optional) (default to 'json') + env = ( + self.env + ) # str | Environment. * `prod` - api.vam.wfp.org * `dev` - dev.api.vam.wfp.org (optional) + + try: + # Get a complete list of markets in a country + api_response = api_instance.markets_list_get( + country_code=country_iso3, page=page, format=format, env=env + ) + df = pd.DataFrame([item.to_dict() for item in api_response.items]) + df = df.replace({np.nan: None}) + return df + except Exception as e: + logger.error("Exception when calling MarketsApi->markets_list_get: %s", e) + raise + + +def get_markets_as_csv( + self, country_iso3: Optional[str] = None, local_names: bool = False +) -> str: + """Retrieves a complete list of markets in a country in CSV format. + + Args: + country_iso3 (str, optional): Country administrative code. Defaults to None. + local_names (bool, optional): If True, market and region names will be + localized if available. Defaults to False. + + Returns: + str: CSV formatted string containing market data + + Examples: + >>> client = DataBridgesShapes("data_bridges_api_config.yaml") + >>> # Get markets CSV for Afghanistan + >>> markets_csv = client.get_markets_as_csv("AFG") + >>> # Get localized market names + >>> local_markets = client.get_markets_as_csv("AFG", local_names=True) + + Raises: + ApiException: If there's an error accessing the Markets API + """ + + adm0code = get_adm0_code(country_iso3) + + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.MarketsApi(api_client) + local_names = False # bool | If true the name of markets and regions will be localized if available (optional) (default to False) + + try: + # Get a complete list of markets in a country + api_response = api_instance.markets_markets_as_csv_get( + adm0code=adm0code, local_names=local_names, env=self.env + ) + logger.info("The response of MarketsApi->markets_markets_as_csv_get:\n") + return api_response + except Exception as e: + logger.error( + "Exception when calling MarketsApi->markets_markets_as_csv_get: %s", + e, + ) + raise + + +def get_nearby_markets( + self, country_iso3: str = None, lat: float = None, lng: float = None +) -> pd.DataFrame: + """Finds markets near a given location within a 15km distance. + + Args: + country_iso3 (str): Country administrative code. Defaults to None. + lat (float): Latitude of the search point. Defaults to None. + lng (float): Longitude of the search point. Defaults to None. + + Returns: + pd.DataFrame: DataFrame containing nearby markets with columns: + - market_id: Unique identifier for the market + - market_name: Name of the market + - distance: Distance from search point in kilometers + - latitude: Market location latitude + - longitude: Market location longitude + And other market-related fields + + Examples: + >>> client = DataBridgesShapes("data_bridges_api_config.yaml") + >>> # Find markets near coordinates in Afghanistan + >>> nearby = client.get_nearby_markets("AFG", 34.515, 69.208) + >>> # Sort markets by distance + >>> closest = nearby.sort_values('distance').iloc[0] + + Raises: + ApiException: If there's an error accessing the Markets API + """ + + adm0code = get_adm0_code(country_iso3) + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.MarketsApi(api_client) + env = self.env + + try: + api_response = api_instance.markets_nearby_markets_get( + adm0code=adm0code, lat=lat, lng=lng, env=env + ) + logger.info("Successfully retrieved nearby markets") + df = pd.DataFrame([item.to_dict() for item in api_response]) + df = df.replace({np.nan: None}) + return df + except ApiException as e: + logger.error( + f"Exception when calling MarketsApi->markets_nearby_markets_get: {e}" + ) + raise diff --git a/data_bridges_knots/endpoints/rpmeApi.py b/data_bridges_knots/endpoints/rpmeApi.py new file mode 100644 index 0000000..138217e --- /dev/null +++ b/data_bridges_knots/endpoints/rpmeApi.py @@ -0,0 +1,179 @@ +from typing import Optional + +import logging + +import data_bridges_client +import numpy as np +import pandas as pd +from data_bridges_client.rest import ApiException + +logname = "data_bridges_api_calls.log" +logging.basicConfig( + filename=logname, + filemode="a", + format="%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + level=logging.INFO, +) + +logger = logging.getLogger(__name__) + + +# TODO: Get the scope and test these functions +def get_rpme_base_data(self, survey_id=None, page: Optional[int] = 1, page_size=20): + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.RpmeApi(api_client) + env = self.env + + try: + api_response = api_instance.rpme_base_data_get( + survey_id=survey_id, page=page, page_size=page_size, env=env + ) + logger.info("Successfully retrieved RPME base data") + df = pd.DataFrame([item.to_dict() for item in api_response.items]) + df = df.replace({np.nan: None}) + return df + except ApiException as e: + logger.error(f"Exception when calling RpmeApi->rpme_base_data_get: {e}") + raise + + +# TODO: Get the scope and test these functions +def get_rpme_full_data( + self, + survey_id=None, + format: Optional[str] = "json", + page: Optional[int] = 1, + page_size=20, +): + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.RpmeApi(api_client) + env = self.env + + try: + api_response = api_instance.rpme_full_data_get( + survey_id=survey_id, + format=format, + page=page, + page_size=page_size, + env=env, + ) + logger.info("Successfully retrieved RPME full data") + df = pd.DataFrame([item.to_dict() for item in api_response.items]) + df = df.replace({np.nan: None}) + return df + except ApiException as e: + logger.error(f"Exception when calling RpmeApi->rpme_full_data_get: {e}") + raise + + +# TODO: Get the scope and test these functions +def get_rpme_output_values( + self, + page: Optional[int] = 1, + adm0_code=None, + survey_id=None, + shop_id=None, + market_id=None, + adm0_code_dots="", +): + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.RpmeApi(api_client) + env = self.env + + try: + api_response = api_instance.rpme_output_values_get( + page=page, + adm0_code=adm0_code, + survey_id=survey_id, + shop_id=shop_id, + market_id=market_id, + adm0_code_dots=adm0_code_dots, + env=env, + ) + logger.info("Successfully retrieved RPME output values") + df = pd.DataFrame([item.to_dict() for item in api_response.items]) + df = df.replace({np.nan: None}) + return df + except ApiException as e: + logger.error(f"Exception when calling RpmeApi->rpme_output_values_get: {e}") + raise + + +# TODO: Get the scope and test these functions +def get_rpme_surveys( + self, adm0_code=0, page: Optional[int] = 1, start_date=None, end_date=None +): + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.RpmeApi(api_client) + env = self.env + + try: + api_response = api_instance.rpme_surveys_get( + adm0_code=adm0_code, + page=page, + start_date=start_date, + end_date=end_date, + env=env, + ) + logger.info("Successfully retrieved RPME surveys") + df = pd.DataFrame([item.to_dict() for item in api_response.items]) + df = df.replace({np.nan: None}) + return df + except ApiException as e: + logger.error(f"Exception when calling RpmeApi->rpme_surveys_get: {e}") + raise + + +# TODO: Get the scope and test these functions +def get_rpme_variables(self, page: Optional[int] = 1): + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.RpmeApi(api_client) + env = self.env + + try: + api_response = api_instance.rpme_variables_get(page=page, env=env) + logger.info("Successfully retrieved RPME variables") + df = pd.DataFrame([item.to_dict() for item in api_response.items]) + df = df.replace({np.nan: None}) + return df + except ApiException as e: + logger.error(f"Exception when calling RpmeApi->rpme_variables_get: {e}") + raise + + +# TODO: Get the scope and test these functions +def get_rpme_xls_forms( + self, adm0_code=0, page: Optional[int] = 1, start_date=None, end_date=None +): + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.RpmeApi(api_client) + env = self.env + + try: + api_response = api_instance.rpme_xls_forms_get( + adm0_code=adm0_code, + page=page, + start_date=start_date, + end_date=end_date, + env=env, + ) + logger.info("Successfully retrieved RPME XLS forms") + df = pd.DataFrame([item.to_dict() for item in api_response.items]) + df = df.replace({np.nan: None}) + return df + except ApiException as e: + logger.error(f"Exception when calling RpmeApi->rpme_xls_forms_get: {e}") + raise diff --git a/data_bridges_knots/endpoints/surveysApi.py b/data_bridges_knots/endpoints/surveysApi.py new file mode 100644 index 0000000..fea5fbd --- /dev/null +++ b/data_bridges_knots/endpoints/surveysApi.py @@ -0,0 +1,243 @@ +from typing import Optional + +import logging + +import data_bridges_client +import numpy as np +import pandas as pd +from data_bridges_client.rest import ApiException + +logname = "data_bridges_api_calls.log" +logging.basicConfig( + filename=logname, + filemode="a", + format="%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + level=logging.INFO, +) + +logger = logging.getLogger(__name__) + + +def get_mfi_surveys_base_data( + self, survey_id=None, page: Optional[int] = 1, page_size=20 +): + """ + Get data that includes the core Market Functionality Index (MFI) fields only by Survey ID. + + Args: + survey_id (int): Unique identifier for the collected data + page (int): Page number for paged results + page_size (int): Number of items per page + + Examples: + >>> client = DataBridgesShapes("data_bridges_api_config.yaml") + >>> # Get MFI base data for a specific survey + >>> base_data = client.get_mfi_surveys_base_data(survey_id=123) + + Returns: + pandas.DataFrame: DataFrame containing MFI base survey data + """ + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.SurveysApi(api_client) + env = self.env + + try: + api_response = api_instance.m_fi_surveys_base_data_get( + survey_id=survey_id, page=page, page_size=page_size, env=env + ) + logger.info("Successfully retrieved MFI surveys base data") + return pd.DataFrame(api_response.items) + + except ApiException as e: + logger.error( + f"Exception when calling SurveysApi->m_fi_surveys_base_data_get: {e}" + ) + raise + + +def get_mfi_surveys_full_data( + self, survey_id=None, page: Optional[int] = 1, page_size=20 +): + """ + Get a full dataset that includes all the fields included in the survey in addition to the core Market Functionality Index (MFI) fields by Survey ID. + """ + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.SurveysApi(api_client) + env = self.env + try: + api_response = api_instance.m_fi_surveys_full_data_get( + survey_id=survey_id, + format="json", + page=page, + page_size=page_size, + env=env, + ) + logger.info("Successfully retrieved MFI surveys full data") + df = pd.DataFrame(api_response.items) + return df + except ApiException as e: + logger.error( + f"Exception when calling SurveysApi->m_fi_surveys_full_data_get: {e}" + ) + raise + + +def get_mfi_surveys( + self, adm0_code=0, page: Optional[int] = 1, start_date=None, end_date=None +): + """ + Retrieve Survey IDs, their corresponding XLS Form IDs, and Base XLS Form of all MFI surveys conducted in a country. + """ + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.SurveysApi(api_client) + env = self.env + + try: + api_response = api_instance.m_fi_surveys_get( + adm0_code=adm0_code, + page=page, + start_date=start_date, + end_date=end_date, + env=env, + ) + logger.info("Successfully retrieved MFI surveys list") + df = pd.DataFrame([item.to_dict() for item in api_response.items]) + df = df.replace({np.nan: None}) + return df + except ApiException as e: + logger.error(f"Exception when calling SurveysApi->m_fi_surveys_get: {e}") + raise + + +def get_mfi_surveys_processed_data( + self, + survey_id=None, + page: Optional[int] = 1, + page_size=20, + format: Optional[str] = "json", + start_date=None, + end_date=None, + adm0_codes=None, + market_id=None, + survey_type=None, +): + """ + Get MFI processed data in long format. + """ + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.SurveysApi(api_client) + env = self.env + + try: + api_response = api_instance.m_fi_surveys_processed_data_get( + survey_id=survey_id, + page=page, + page_size=page_size, + format=format, + start_date=start_date, + end_date=end_date, + adm0_codes=adm0_codes, + market_id=market_id, + survey_type=survey_type, + env=env, + ) + logger.info("Successfully retrieved MFI surveys processed data") + df = pd.DataFrame([item.to_dict() for item in api_response.items]) + df = df.replace({np.nan: None}) + return df + except ApiException as e: + logger.error( + f"Exception when calling SurveysApi->m_fi_surveys_processed_data_get: {e}" + ) + raise + + +def get_mfi_xls_forms( + self, adm0_code=0, page: Optional[int] = 1, start_date=None, end_date=None +): + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.XlsFormsApi(api_client) + env = self.env + + try: + api_response = api_instance.m_fi_xls_forms_get( + adm0_code=adm0_code, + page=page, + start_date=start_date, + end_date=end_date, + env=env, + ) + logger.info("Successfully retrieved MFI XLS forms") + df = pd.DataFrame([item.to_dict() for item in api_response.items]) + df = df.replace({np.nan: None}) + return df + except ApiException as e: + logger.error(f"Exception when calling XlsFormsApi->m_fi_xls_forms_get: {e}") + raise + + +def get_mfi_xls_forms_detailed( + self, adm0_code=0, page: Optional[int] = 1, start_date=None, end_date=None +): + """ + Get a complete list of XLS Forms uploaded on the MFI Data Bridge in a given period of data collection. + + Args: + adm0_code (int): Code for the country. Defaults to 0. + page (int): Page number for paged results. Defaults to 1. + start_date (str): Starting date for data collection range (YYYY-MM-DD format) + end_date (str): Ending date for data collection range (YYYY-MM-DD format) + + Examples: + >>> client = DataBridgesShapes("data_bridges_api_config.yaml") + >>> # Get detailed XLS forms for country code 231 + >>> detailed_forms = client.get_mfi_xls_forms_detailed(adm0_code=231) + >>> # Get forms within a date range + >>> forms_in_range = client.get_mfi_xls_forms_detailed( + ... adm0_code=231, + ... start_date="2023-01-01", + ... end_date="2023-12-31" + ... ) + + Returns: + pandas.DataFrame: DataFrame containing XLS Forms data + """ + with data_bridges_client.ApiClient( + self._setup_configuration_and_authentication(self.config) + ) as api_client: + api_instance = data_bridges_client.XlsFormsApi(api_client) + env = self.env + + try: + api_response = api_instance.m_fi_xls_forms_get( + adm0_code=adm0_code, + page=page, + start_date=start_date, + end_date=end_date, + env=env, + ) + logger.info("Successfully retrieved detailed MFI XLS forms") + + # Convert response items to DataFrame + df = pd.DataFrame([item.to_dict() for item in api_response.items]) + df = df.replace({np.nan: None}) + + # Add total items count as DataFrame attribute + df.total_items = api_response.total_items + + return df + + except ApiException as e: + logger.error(f"Exception when calling XlsFormsApi->m_fi_xls_forms_get: {e}") + raise diff --git a/endpoints_coverage_tracker.csv b/endpoints_coverage_tracker.csv new file mode 100644 index 0000000..636e2c7 --- /dev/null +++ b/endpoints_coverage_tracker.csv @@ -0,0 +1,48 @@ +Class,Method,HTTP request,Description,DataBridgesKnots +CommoditiesApi,get_commodity_categories_list,GET /Commodities/Categories/List,Provides the list of categories., +CommoditiesApi,commodities_list_get,GET /Commodities/List,Provide the detailed list of the commodities available in DataBridges platform, +CommodityUnitsApi,commodity_units_conversion_list_get,GET /CommodityUnits/Conversion/List,Provides conversion factors to Kilogram or Litres for each convertible unit of measure., +CommodityUnitsApi,commodity_units_list_get,GET /CommodityUnits/List,Provides the detailed list of the unit of measure available in DataBridges platform, +CurrencyApi,currency_list_get,GET /Currency/List,"Returns the list of currencies available in the internal VAM database, with Currency 3-letter code, matching with ISO 4217.", +CurrencyApi,currency_usd_indirect_quotation_get,GET /Currency/UsdIndirectQuotation,"Returns the value of the Exchange rates from Trading Economics, for official rates, and DataViz for unofficial rates.", +EconomicDataApi,economic_data_indicator_list_get,GET /EconomicData/IndicatorList,Returns the lists of indicators., +EconomicDataApi,economic_data_indicator_name_get,GET /EconomicData/{indicatorName},Returns the time series of values for different indicators., +GlobalOutlookApi,global_outlook_country_latest_get,GET /GlobalOutlook/CountryLatest,Return the latest country dataset of number of acutely food insecure (in thousands) based on WFP's Global Outlook., +GlobalOutlookApi,global_outlook_global_latest_get,GET /GlobalOutlook/GlobalLatest,Return the latest global dataset of number of acutely food insecure (in millions) based on WFP's Global Outlook., +GlobalOutlookApi,global_outlook_regional_latest_get,GET /GlobalOutlook/RegionalLatest,Return the latest regional dataset of number of acutely food insecure (in millions) based on WFP's Global Outlook., +HungerHotspotApi,hunger_hotspot_categories_and_indicators_get,GET /HungerHotspot/CategoriesAndIndicators,Retrieves Hunger Hotspot categories and indicators., +HungerHotspotApi,hunger_hotspot_data_get,GET /HungerHotspot/Data,Retrieves a paginated list of Hunger Hotspot data., +IncubationApi,cari_adm0_values_get,GET /Cari/Adm0Values,"Retrieves a paginated list of Adm0 CARI results based on the specified indicator, administrative code, and survey.", +IncubationApi,cari_adm1_values_get,GET /Cari/Adm1Values,"Retrieves a paginated list of Adm1 CARI results based on the specified indicator, administrative code, and survey.", +IncubationApi,household_draft_internal_base_data_get,GET /Household/DraftInternalBaseData,"Get data that includes the core household fields only by Survey ID. To access this data, please contact Wael ATTIA for authorization. This endpoint will send you only data you have access to, based on permissions assigned to your application profile. The "apiKey" can be found in the profile section of the DataBridges application.", +IncubationApi,household_full_data_get,GET /Household/FullData,"Get a full dataset that includes all the fields included in the survey in addition to the core household fields by Survey ID. To access this data, please contact Wael ATTIA for authorization. This endpoint will send you only data you have access to, based on permissions assigned to your application profile. The "apiKey" can be found in the profile section of the DataBridges application.", +IncubationApi,household_official_use_base_data_get,GET /Household/OfficialUseBaseData,Get data that includes the core household fields only by Survey ID,1 +IncubationApi,household_public_base_data_get,GET /Household/PublicBaseData,Get data that includes the core household fields only by Survey ID,1 +IncubationApi,household_surveys_get,GET /Household/Surveys,"Retrieve 1) Survey IDs, 2) their corresponding XLS Form IDs, and 3) Base XLS Form of all household surveys conducted in a country. A date of reference, SurveyDate, for the data collection is set by the officer responsible for the upload for each survey.",1 +IncubationApi,m_fi_surveys_processed_data_with_keyset_pagination_get,GET /MFI/Surveys/ProcessedDataWithKeysetPagination,"Please use this endpoint only for large data retrieval - Response will include only JSON format - Get a MFI processed data in long format; levels indicate the data aggregation level 1) Normalized Score, 2) Trader Aggregate Score, 3) Market Aggregate Score, 4) Trader Median, 5) Trader Mean, 6) Market Mean; each line corresponds to one of the nine dimensions of scores plus the final MFI aggregate score; 1) Assortment, 2) Availability, 3) Price, 4) Resilience, 5) Competition, 6) Infrastructure, 7) Service, 8) Quality, 9) Access and Protection, and 10) MFI final score; the variable label describes each variable and its value range",1 +IncubationApi,xls_forms_definition_get,GET /XlsForms/definition,Get a complete set of XLS Form definitions of a given XLS Form ID. This is the digital version of the questionnaire used during the data collection exercise., +IpcchApi,ipcch_ipcch_and_equivalent_historical_peaks_get,GET /Ipcch/IPCCHAndEquivalent-HistoricalPeaks,"Retrieves a paginated list of historical IPCCH and Equivalent peaks data, optionally filtered by ISO3 country code.",-1 +IpcchApi,ipcch_ipcch_and_equivalent_latest_peaks_get,GET /Ipcch/IPCCHAndEquivalent-LatestPeaks,"Retrieves a paginated list of the latest IPCCH and Equivalent peaks data, optionally filtered by ISO3 country code.",-1 +IpcchApi,ipcch_ipcch_and_equivalent_most_recent_get,GET /Ipcch/IPCCHAndEquivalent-MostRecent,"Retrieves a paginated list of the most recent IPCCH and Equivalent data, optionally filtered by ISO3 country code.",-1 +IpcchApi,ipcch_ipcch_and_equivalent_peaks_wfp_dashboard_get,GET /Ipcch/IPCCHAndEquivalentPeaks-WFPDashboard,Retrieves a paginated list of IPCCH and Equivalent Peaks data for the WFP Dashboard.,-1 +IpcchApi,ipcch_ipcch_historical_data_get,GET /Ipcch/IPCCH-HistoricalData,Retrieves a paginated list of IPCCH and Equivalent Historical Data.,-1 +MarketPricesApi,market_prices_alps_get,GET /MarketPrices/Alps,Returns time series values of ALPS and PEWI., +MarketPricesApi,market_prices_price_daily_get,GET /MarketPrices/PriceDaily,Returns a daily time series of commodity market prices., +MarketPricesApi,market_prices_price_monthly_get,GET /MarketPrices/PriceMonthly,Returns a monthly time series of commodity market prices., +MarketPricesApi,market_prices_price_raw_get,GET /MarketPrices/PriceRaw,Returns original commodity market prices, +MarketPricesApi,market_prices_price_weekly_get,GET /MarketPrices/PriceWeekly,Returns a weekly time series of commodity market prices., +MarketsApi,markets_geo_json_list_get,GET /Markets/GeoJSONList,Provide a list of geo referenced markets in a specific country, +MarketsApi,markets_list_get,GET /Markets/List,Get a complete list of markets in a country, +MarketsApi,markets_markets_as_csv_get,GET /Markets/MarketsAsCSV,Get a complete list of markets in a country, +MarketsApi,markets_nearby_markets_get,GET /Markets/NearbyMarkets,Find markets near a given location by longitude and latitude within a 15Km distance, +RpmeApi,rpme_base_data_get,GET /Rpme/BaseData,Get data that includes the core RPME fields only by Survey ID, +RpmeApi,rpme_full_data_get,GET /Rpme/FullData,Get a full dataset that includes all the fields included in the survey in addition to the core RPME fields by Survey ID., +RpmeApi,rpme_output_values_get,GET /Rpme/OutputValues,Processed values for each variable used in the assessments, +RpmeApi,rpme_surveys_get,GET /Rpme/Surveys,"Retrieve 1) Survey IDs, 2) their corresponding XLS Form IDs, and 3) Base XLS Form of all RPME surveys conducted in a country. The date of reference, SurveyDate, for the data collection is set by the officer responsible for the upload of each survey.", +RpmeApi,rpme_variables_get,GET /Rpme/Variables,List of variables, +RpmeApi,rpme_xls_forms_get,GET /Rpme/XLSForms,Get a complete list of XLS Forms uploaded on the RPME in a given period of data collection. This is the digital version of the questionnaire used during the data collection exercise., +SurveysApi,m_fi_surveys_base_data_get,GET /MFI/Surveys/BaseData,Get data that includes the core Market Functionality Index (MFI) fields only by Survey ID, +SurveysApi,m_fi_surveys_full_data_get,GET /MFI/Surveys/FullData,"Get a full dataset that includes all the fields included in the survey in addition to the core Market Functionality Index (MFI) fields by Survey ID. To access this data, please contact global.mfi@wfp.org for authorization.", +SurveysApi,m_fi_surveys_get,GET /MFI/Surveys,"Retrieve 1) Survey IDs, 2) their corresponding XLS Form IDs, and 3) Base XLS Form of all MFI surveys conducted in a country. A date of reference, SurveyDate, for the data collection is set by the officer responsible for the upload for each survey.", +SurveysApi,m_fi_surveys_processed_data_get,GET /MFI/Surveys/ProcessedData,"Get a MFI processed data in long format; levels indicate the data aggregation level 1) Normalized Score, 2) Trader Aggregate Score, 3) Market Aggregate Score, 4) Trader Median, 5) Trader Mean, 6) Market Mean; each line corresponds to one of the nine dimensions of scores plus the final MFI aggregate score; 1) Assortment, 2) Availability, 3) Price, 4) Resilience, 5) Competition, 6) Infrastructure, 7) Service, 8) Quality, 9) Access and Protection, and 10) MFI final score; the variable label describes each variable and its value range", +XlsFormsApi,m_fi_xls_forms_get,GET /MFI/XlsForms,Get a complete list of XLS Forms uploaded on the MFI Data Bridge in a given period of data collection. This is the digital version of the questionnaire used during the data collection exercise., diff --git a/pyproject.toml b/pyproject.toml index 39028d2..9b27d7a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,6 @@ dependencies = [ "data-bridges-client (>=8.0.0,<9.0.0)", ] - [project.optional-dependencies] STATA = ["stata-setup", "pystata"] R = [] diff --git a/tests/conftest.py b/tests/conftest.py index 4e4630c..7e5deeb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,9 +4,12 @@ import pytest from dotenv import load_dotenv - load_dotenv() +# ------------------------- +# ✅ Fixtures +# ------------------------- + @pytest.fixture def sample_survey_df(): diff --git a/tests/integration/test_client.py b/tests/integration/test_api_client.py similarity index 91% rename from tests/integration/test_client.py rename to tests/integration/test_api_client.py index 621e4e1..63894ff 100644 --- a/tests/integration/test_client.py +++ b/tests/integration/test_api_client.py @@ -2,17 +2,21 @@ import pytest from dotenv import load_dotenv -from data_bridges_knots.client import DataBridgesShapes, config_from_env +from data_bridges_knots.client import ( + DataBridgesKnots, + config_from_env, +) pytestmark = pytest.mark.integration -# ------------------------- -# ✅ Fixtures -# ------------------------- + +# ========================================================= +# ✅ 3. SUCCESS TESTS (expect 200) +# ========================================================= @pytest.fixture -def config_dict(): +def valid_config(): load_dotenv() config = config_from_env() @@ -20,34 +24,9 @@ def config_dict(): @pytest.fixture -def client(config_dict): - return DataBridgesShapes(config_dict) - - -# ------------------------- -# ✅ 1. Import -# ------------------------- - - -def test_import(): - from data_bridges_knots.client import DataBridgesShapes +def client(valid_config): + return DataBridgesKnots(valid_config) - assert DataBridgesShapes is not None - - -# ------------------------- -# ✅ 2. Config -# ------------------------- - - -def test_client_init(config_dict): - client = DataBridgesShapes(config_dict) - assert isinstance(client, DataBridgesShapes) - - -# ========================================================= -# ✅ 3. SUCCESS TESTS (expect 200) -# ========================================================= # ========================================================= # ✅ PRICES & CURRENCY @@ -103,6 +82,8 @@ def test_prices_and_currency_endpoints(client, func, args, kwargs): # Commodity conversions ("get_commodity_units_conversion_list", (), {}), ("get_commodity_units_conversion_list", (), {"country_iso3": "TZA"}), + # Commodity categories + ("get_commodity_categories_list", (), {"country_iso3": "AFG"}), ], ) def test_commodities_endpoints(client, func, args, kwargs): diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py new file mode 100644 index 0000000..f10a2ed --- /dev/null +++ b/tests/unit/test_client.py @@ -0,0 +1,285 @@ +import pytest +from dotenv import load_dotenv + +from data_bridges_knots.client import ( + DataBridgesKnots, + DataBridgesShapes, + config_from_env, +) + + +@pytest.fixture +def valid_config(): + load_dotenv() + config = config_from_env() + + return config + + +@pytest.fixture +def client(valid_config): + return DataBridgesShapes(valid_config) + + +# ------------------------- +# ✅ 1. Import +# ------------------------- + + +def test_deprecated_import(): + from data_bridges_knots.client import DataBridgesShapes + + assert DataBridgesShapes is not None + + +def test_import(): + from data_bridges_knots.client import DataBridgesKnots + + assert DataBridgesKnots is not None + + +# ------------------------- +# ✅ 2. Config +# ------------------------- + + +def test_deprecated_client_init(valid_config): + client = DataBridgesShapes(valid_config) + assert isinstance(client, DataBridgesShapes) + + +def test_client_init(valid_config): + client = DataBridgesKnots(valid_config) + assert isinstance(client, DataBridgesKnots) + + +# % HERE NEW TESTS + + +# # ====================================================== +# # Fixtures +# # ====================================================== + +# @pytest.fixture +# def mock_item(): +# item = MagicMock() +# item.to_dict.return_value = {"col": "value"} +# return item + + +# @pytest.fixture +# def mock_api_response(mock_item): +# response = MagicMock() +# response.items = [mock_item] +# response.total_items = 1 +# return response + + +# # ====================================================== +# # config_from_env +# # ====================================================== + + +# # def test_config_from_env_missing(monkeypatch): +# # monkeypatch.delenv("WFP_API_CLIENT_ID", raising=False) + +# # with pytest.raises(ValueError): +# # config_from_env() + + +# # ====================================================== +# # Initialization +# # ====================================================== + +# def test_init_success(valid_config): +# client = DataBridgesShapes(valid_config) +# assert client.config == valid_config + + +# def test_validate_config_failure(): +# with pytest.raises(ValueError): +# DataBridgesShapes({"WFP_API_CLIENT_ID": "only"}) + + +# def test_load_config_dict(valid_config): +# client = DataBridgesShapes(valid_config) +# loaded = client._load_config(valid_config) +# assert loaded == valid_config + + +# def test_load_config_type_error(valid_config): +# client = DataBridgesShapes(valid_config) + +# with pytest.raises(TypeError): +# client._load_config(123) + + +# # ====================================================== +# # API MOCKING HELPERS +# # ====================================================== + +# def setup_mock_context(mock_client): +# mock_client.return_value.__enter__.return_value = MagicMock() + + +# # ====================================================== +# # get_prices +# # ====================================================== + +# @patch("data_bridges_knots.client.data_bridges_client.ApiClient") +# def test_get_prices(mock_api, mock_client, valid_config, mock_api_response): +# setup_mock_context(mock_client) +# mock_api.return_value.market_prices_price_monthly_get.return_value = mock_api_response + +# client = DataBridgesShapes(valid_config) + +# df = client.get_prices("KEN", start_date="2024-01-01") + +# assert isinstance(df, pd.DataFrame) +# assert not df.empty + + +# # ====================================================== +# # get_exchange_rates +# # ====================================================== + +# @patch("data_bridges_knots.client.data_bridges_client.ApiClient") +# @patch("data_bridges_knots.client.data_bridges_client.CurrencyApi") +# def test_get_exchange_rates(mock_api, mock_client, valid_config, mock_api_response): +# setup_mock_context(mock_client) +# mock_api.return_value.currency_usd_indirect_quotation_get.return_value = mock_api_response + +# client = DataBridgesShapes(valid_config) + +# df = client.get_exchange_rates("KEN") + +# assert len(df) == 1 + + +# # ====================================================== +# # get_commodities_list +# # ====================================================== + +# @patch("data_bridges_knots.client.data_bridges_client.ApiClient") +# @patch("data_bridges_knots.client.data_bridges_client.CommoditiesApi") +# def test_get_commodities_list(mock_api, mock_client, valid_config, mock_api_response): +# setup_mock_context(mock_client) +# mock_api.return_value.commodities_list_get.return_value = mock_api_response + +# client = DataBridgesShapes(valid_config) + +# df = client.get_commodities_list() + +# assert "col" in df.columns + + +# # ====================================================== +# # get_market_geojson_list +# # ====================================================== + +# @patch("data_bridges_knots.client.get_adm0_code", return_value=123) +# @patch("data_bridges_knots.client.data_bridges_client.ApiClient") +# @patch("data_bridges_knots.client.data_bridges_client.MarketsApi") +# def test_get_market_geojson(mock_api, mock_client, mock_adm0, valid_config): +# setup_mock_context(mock_client) + +# mock_response = MagicMock() +# mock_response.model_dump.return_value = {"type": "FeatureCollection"} +# mock_api.return_value.markets_geo_json_list_get.return_value = mock_response + +# client = DataBridgesShapes(valid_config) + +# result = client.get_market_geojson_list("KEN") + +# assert result["type"] == "FeatureCollection" + + +# def test_get_market_geojson_requires_param(valid_config): +# client = DataBridgesShapes(valid_config) + +# with pytest.raises(ValueError): +# client.get_market_geojson_list() + + +# # ====================================================== +# # get_global_outlook +# # ====================================================== + +# @patch("data_bridges_knots.client.data_bridges_client.ApiClient") +# @patch("data_bridges_knots.client.data_bridges_client.GlobalOutlookApi") +# def test_get_global_outlook_country(mock_api, mock_client, valid_config, mock_api_response): +# setup_mock_context(mock_client) +# mock_api.return_value.global_outlook_country_latest_get.return_value = mock_api_response + +# client = DataBridgesShapes(valid_config) + +# df = client.get_global_outlook("country_latest") + +# assert isinstance(df, pd.DataFrame) + + +# def test_get_global_outlook_invalid(valid_config): +# client = DataBridgesShapes(valid_config) + +# with pytest.raises(ValueError): +# client.get_global_outlook("invalid") + + +# # ====================================================== +# # Household questionnaire + choices +# # ====================================================== + +# def test_get_choice_list(valid_config): +# client = DataBridgesShapes(valid_config) + +# client.xlsform = pd.DataFrame([{ +# "fields": [ +# { +# "choiceList": [ +# { +# "name": "food", +# "choices": [ +# {"name": "rice", "label": "Rice"} +# ] +# } +# ] +# } +# ] +# }]) + +# df = client.get_choice_list(1) + +# assert "value" in df.columns +# assert "label" in df.columns + + +# # ====================================================== +# # Error propagation +# # ====================================================== + +# @patch("data_bridges_knots.client.data_bridges_client.ApiClient") +# @patch("data_bridges_knots.client.data_bridges_client.MarketPricesApi") +# def test_get_prices_raises_api_exception(mock_api, mock_client, valid_config): +# from data_bridges_knots.client import ApiException + +# setup_mock_context(mock_client) +# mock_api.return_value.market_prices_price_monthly_get.side_effect = ApiException("fail") + +# client = DataBridgesShapes(valid_config) + +# with pytest.raises(ApiException): +# client.get_prices("KEN") + + +# # ====================================================== +# # String methods +# # ====================================================== + +# def test_repr(valid_config): +# client = DataBridgesShapes(valid_config) +# assert "DataBridgesShapes" in repr(client) + + +# def test_str(valid_config): +# client = DataBridgesShapes(valid_config) +# assert "API Host" in str(client) diff --git a/tests/unit/test_labels.py b/tests/unit/test_labels.py index d1b0e4e..7e41085 100644 --- a/tests/unit/test_labels.py +++ b/tests/unit/test_labels.py @@ -1,6 +1,5 @@ from typing import Dict - import pandas as pd from data_bridges_knots.labels import ( diff --git a/uv.lock b/uv.lock index ff664a7..24e2f48 100644 --- a/uv.lock +++ b/uv.lock @@ -515,7 +515,7 @@ wheels = [ [[package]] name = "data-bridges-knots" -version = "3.1.3" +version = "3.1.4" source = { editable = "." } dependencies = [ { name = "data-bridges-client" },