diff --git a/src/physrisk/risk_models/generic_risk_model.py b/src/physrisk/risk_models/generic_risk_model.py index 5d76c89f..f60dfe9d 100644 --- a/src/physrisk/risk_models/generic_risk_model.py +++ b/src/physrisk/risk_models/generic_risk_model.py @@ -34,6 +34,7 @@ ) from physrisk.kernel.risk import Measure, MeasureKey, RiskMeasureCalculator from pint import UnitRegistry +from physrisk.utils.units import needs_conversion ureg = UnitRegistry() @@ -445,7 +446,7 @@ def calc_measure( param = float( np.interp(return_period, resp.return_periods, resp.intensities) ) - if resp.units != "default": + if needs_conversion(resp.units, bounds.units): param = ureg.convert(param, resp.units, bounds.units) if math.isnan(param): return Measure( diff --git a/src/physrisk/utils/units.py b/src/physrisk/utils/units.py new file mode 100644 index 00000000..32046d1f --- /dev/null +++ b/src/physrisk/utils/units.py @@ -0,0 +1,33 @@ +UNITLESS = {"index", ""} + + +def needs_conversion(source_units: str, target_units: str | None) -> bool: + """Return whether values should be converted between two unit labels. + + Units marked as ``default`` and missing target units do not require + conversion. Equal units also do not require conversion. Unitless indicators (e.g. index) + cannot be converted to or from dimensional units, and attempting to do so raises an error. + + Args: + source_units: Units attached to the source values. + target_units: Units expected by the target consumer, or ``None`` if + unspecified. + + Returns: + ``True`` when unit conversion should be attempted, otherwise ``False``. + + Raises: + ValueError: If source and target are provided, source is not ``default``, and only one is unitless. + """ + if source_units == "default" or target_units in (None, "default"): + return False + + if source_units == target_units: + return False + + if source_units in UNITLESS or target_units in UNITLESS: + raise ValueError( + f"Conversion is not defined between incompatible units: {source_units} and {target_units}" + ) + + return True diff --git a/src/physrisk/vulnerability_models/config_based_vuln_model_acute.py b/src/physrisk/vulnerability_models/config_based_vuln_model_acute.py index b1f4dfc6..2faab572 100644 --- a/src/physrisk/vulnerability_models/config_based_vuln_model_acute.py +++ b/src/physrisk/vulnerability_models/config_based_vuln_model_acute.py @@ -39,6 +39,7 @@ from physrisk.vulnerability_models.impact_function_selector import ( ImpactFunctionSelector, ) +from physrisk.utils.units import needs_conversion ureg = UnitRegistry() logger = logging.getLogger(__name__) @@ -152,7 +153,7 @@ def get_distributions( "Future hazard curve is not monotonic; adjusting to ensure non-decreasing curve." ) - conversion = curve.indicator_units is not None and future.units != "default" + conversion = needs_conversion(future.units, curve.indicator_units) if conversion: fut_intensities = ureg.convert( fut_intensities, future.units, curve.indicator_units diff --git a/tests/vulnerability_models/test_config_based_vulnerability.py b/tests/vulnerability_models/test_config_based_vulnerability.py index 34e68a5e..cf57caaa 100644 --- a/tests/vulnerability_models/test_config_based_vulnerability.py +++ b/tests/vulnerability_models/test_config_based_vulnerability.py @@ -34,6 +34,7 @@ RiverineInundation, Wind, ) +from physrisk.kernel.hazard_model import HazardEventDataResponse from physrisk.kernel.impact_distrib import ImpactType from physrisk.vulnerability_models.config_based_impact_curves import ( PiecewiseLinearImpactCurve, @@ -812,6 +813,42 @@ def test_create_vulnerability_model(): ) +def test_config_based_vulnerability_accepts_index_units(): + config_items = [ + VulnerabilityConfigItem( + hazard_class="Fire", + asset_class="Asset", + asset_identifier="type=Generic,location=Generic", + indicator_id="susceptibility", + indicator_units="index", + impact_id="damage", + impact_units=None, + curve_type="indicator/piecewise_linear", + points_x=[0.0, 0.5, 1.0], + points_y=[0.0, 0.2, 1.0], + ) + ] + factory = VulnerabilityModelsFactory(config=config_items) + vulnerability_models = factory.vulnerability_models() + model = vulnerability_models.vuln_model_for_asset_of_type(Asset)[0] + asset = Asset(location="Generic", latitude=0.0, longitude=0.0) + hazard_response = HazardEventDataResponse( + return_periods=np.array([10.0, 100.0, 1000.0]), + intensities=np.array([0.2, 0.6, 0.9]), + units="index", + ) + + vulnerability, event = model.get_distributions(asset, [hazard_response]) + + np.testing.assert_array_equal( + vulnerability.intensity_bins, np.array([0.2, 0.6, 0.9, 0.9]) + ) + np.testing.assert_array_almost_equal( + vulnerability.impact_bins, np.array([0.08, 0.36, 0.84, 0.84]) + ) + assert event.units == "index" + + @pytest.mark.skip("example, not test") def test_read_write_utilities(tmp_path): config = basic_vulnerability_config()