Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/physrisk/risk_models/generic_risk_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
)
from physrisk.kernel.risk import Measure, MeasureKey, RiskMeasureCalculator
from pint import UnitRegistry
from physrisk.utils.units import needs_conversion

ureg = UnitRegistry()

Expand Down Expand Up @@ -445,7 +446,7 @@ def calc_measure(
param = float(
np.interp(return_period, resp.return_periods, resp.intensities)
)
if resp.units != "default":
if needs_conversion(resp.units, bounds.units):
param = ureg.convert(param, resp.units, bounds.units)
if math.isnan(param):
return Measure(
Expand Down
33 changes: 33 additions & 0 deletions src/physrisk/utils/units.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
UNITLESS = {"index", ""}


def needs_conversion(source_units: str, target_units: str | None) -> bool:
"""Return whether values should be converted between two unit labels.

Units marked as ``default`` and missing target units do not require
conversion. Equal units also do not require conversion. Unitless indicators (e.g. index)
cannot be converted to or from dimensional units, and attempting to do so raises an error.

Args:
source_units: Units attached to the source values.
target_units: Units expected by the target consumer, or ``None`` if
unspecified.

Returns:
``True`` when unit conversion should be attempted, otherwise ``False``.

Raises:
ValueError: If source and target are provided, source is not ``default``, and only one is unitless.
"""
if source_units == "default" or target_units in (None, "default"):
return False

if source_units == target_units:
return False

if source_units in UNITLESS or target_units in UNITLESS:
raise ValueError(
f"Conversion is not defined between incompatible units: {source_units} and {target_units}"
)

return True
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from physrisk.vulnerability_models.impact_function_selector import (
ImpactFunctionSelector,
)
from physrisk.utils.units import needs_conversion

ureg = UnitRegistry()
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -152,7 +153,7 @@ def get_distributions(
"Future hazard curve is not monotonic; adjusting to ensure non-decreasing curve."
)

conversion = curve.indicator_units is not None and future.units != "default"
conversion = needs_conversion(future.units, curve.indicator_units)
if conversion:
fut_intensities = ureg.convert(
fut_intensities, future.units, curve.indicator_units
Expand Down
37 changes: 37 additions & 0 deletions tests/vulnerability_models/test_config_based_vulnerability.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
RiverineInundation,
Wind,
)
from physrisk.kernel.hazard_model import HazardEventDataResponse
from physrisk.kernel.impact_distrib import ImpactType
from physrisk.vulnerability_models.config_based_impact_curves import (
PiecewiseLinearImpactCurve,
Expand Down Expand Up @@ -812,6 +813,42 @@ def test_create_vulnerability_model():
)


def test_config_based_vulnerability_accepts_index_units():
config_items = [
VulnerabilityConfigItem(
hazard_class="Fire",
asset_class="Asset",
asset_identifier="type=Generic,location=Generic",
indicator_id="susceptibility",
indicator_units="index",
impact_id="damage",
impact_units=None,
curve_type="indicator/piecewise_linear",
points_x=[0.0, 0.5, 1.0],
points_y=[0.0, 0.2, 1.0],
)
]
factory = VulnerabilityModelsFactory(config=config_items)
vulnerability_models = factory.vulnerability_models()
model = vulnerability_models.vuln_model_for_asset_of_type(Asset)[0]
asset = Asset(location="Generic", latitude=0.0, longitude=0.0)
hazard_response = HazardEventDataResponse(
return_periods=np.array([10.0, 100.0, 1000.0]),
intensities=np.array([0.2, 0.6, 0.9]),
units="index",
)

vulnerability, event = model.get_distributions(asset, [hazard_response])

np.testing.assert_array_equal(
vulnerability.intensity_bins, np.array([0.2, 0.6, 0.9, 0.9])
)
np.testing.assert_array_almost_equal(
vulnerability.impact_bins, np.array([0.08, 0.36, 0.84, 0.84])
)
assert event.units == "index"


@pytest.mark.skip("example, not test")
def test_read_write_utilities(tmp_path):
config = basic_vulnerability_config()
Expand Down
Loading