From 12eee7dae6e3261b5343123cb6437d3159beab3a Mon Sep 17 00:00:00 2001 From: Alexander Clark Date: Wed, 2 Jul 2025 08:59:22 -0700 Subject: [PATCH] lint curves module --- freeride/curves.py | 239 +++++++++++++++++++++++++++++++-------------- 1 file changed, 164 insertions(+), 75 deletions(-) diff --git a/freeride/curves.py b/freeride/curves.py index 246496c..851b8b0 100644 --- a/freeride/curves.py +++ b/freeride/curves.py @@ -1,7 +1,7 @@ import numpy as np import matplotlib.pyplot as plt import warnings -from freeride.plotting import textbook_axes, AREA_FILLS, update_axes_limits +from freeride.plotting import update_axes_limits from freeride.formula import _formula from freeride.affine import ( AffineElement, @@ -10,13 +10,29 @@ blind_sum, horizontal_sum, ) -from freeride.quadratic import QuadraticElement, BaseQuadratic + from freeride.revenue import Revenue, MarginalRevenue -from IPython.display import Latex, display -from bokeh.plotting import figure, show +from bokeh.plotting import figure from bokeh.models import HoverTool, ColumnDataSource from freeride.exceptions import PPFError +__all__ = [ + "ppf_sum", + "BaseAffine", + "Demand", + "Supply", + "Constraint", + "PPF", + "AffineElement", + "Affine", + "intersection", + "blind_sum", + "horizontal_sum", + "Revenue", + "MarginalRevenue", +] + + def ppf_sum(*curves, comparative_advantage=True): """Combine production possibilities frontiers. @@ -35,7 +51,10 @@ def ppf_sum(*curves, comparative_advantage=True): aggregate frontier. """ - slope_and_curves = sorted([(s.slope, s) for s in curves], reverse=comparative_advantage) + slope_and_curves = sorted( + [(s.slope, s) for s in curves], + reverse=comparative_advantage, + ) curves = [t[1] for t in slope_and_curves] x_intercepts = [c.q_intercept for c in curves] y_intercepts = [c.intercept for c in curves] @@ -43,7 +62,7 @@ def ppf_sum(*curves, comparative_advantage=True): for key, ppf in enumerate(curves): previous_x = sum(x_intercepts[0:key]) - below_y = sum(y_intercepts[key+1:]) + below_y = sum(y_intercepts[key + 1:]) new = ppf.vertical_shift(below_y, inplace=False) new.horizontal_shift(previous_x) @@ -56,7 +75,14 @@ def ppf_sum(*curves, comparative_advantage=True): class BaseAffine: - def __init__(self, intercept=None, slope=None, elements=None, inverse=True, sum_elements=True): + def __init__( + self, + intercept=None, + slope=None, + elements=None, + inverse=True, + sum_elements=True, + ): """ Initialize the BaseAffine object. @@ -67,11 +93,14 @@ def __init__(self, intercept=None, slope=None, elements=None, inverse=True, sum_ slope : float or list of floats, optional The slope(s) of the affine transformation. Default is None. elements : list of AffineElement, optional - List of AffineElement objects. If provided, it will override `intercept` and `slope`. Default is None. + List of AffineElement objects. If provided, it will override + `intercept` and `slope`. Default is None. inverse : bool, optional - Indicates if the transformation should be inverted. Default is True. + Indicates if the transformation should be inverted. + Default is True. sum_elements : bool, optional - Whether to sum elements together (True) or keep them as separate pieces (False). Default is True. + Whether to sum elements together (True) or keep them as separate + pieces (False). Default is True. Raises ------ @@ -87,7 +116,10 @@ def __init__(self, intercept=None, slope=None, elements=None, inverse=True, sum_ raise ValueError("Slope and intercept lengths do not match.") zipped = zip(slope, intercept) - elements = [AffineElement(slope=m, intercept=b, inverse=inverse) for m, b in zipped] + elements = [ + AffineElement(slope=m, intercept=b, inverse=inverse) + for m, b in zipped + ] self.elements = elements self.sum_elements = sum_elements @@ -107,7 +139,8 @@ def _has_perfectly_elastic_segment(self): return any(el.slope == 0 for el in self.elements) def _has_perfectly_inelastic_segment(self): - """Return ``True`` if any element is perfectly inelastic (infinite slope).""" + """Return ``True`` if any element is perfectly inelastic (infinite + slope).""" return any(np.isinf(el.slope) for el in self.elements) @property @@ -122,8 +155,12 @@ def has_perfectly_inelastic_segment(self): @property def has_perfect_segment(self): - """bool: Whether the curve contains a perfectly elastic or inelastic segment.""" - return self.has_perfectly_elastic_segment or self.has_perfectly_inelastic_segment + """bool: Whether the curve contains a perfectly elastic or inelastic + segment.""" + return ( + self.has_perfectly_elastic_segment + or self.has_perfectly_inelastic_segment + ) @classmethod def from_two_points(cls, x1, y1, x2, y2): @@ -141,7 +178,8 @@ def from_points(cls, xy_points): """ Creates an Affine object from two points. - In the future, this might be extended to allow for three or more points. + In the future, this might be extended to allow for three or more + points. """ A_array = [[qp[0], 1] for qp in xy_points] @@ -159,14 +197,18 @@ def from_formula(cls, equation: str): return cls(slope=slope, intercept=intercept) def horizontal_shift(self, delta, inplace=True): - new_elements = [e.horizontal_shift(delta, inplace=False) for e in self.elements] + new_elements = [ + e.horizontal_shift(delta, inplace=False) for e in self.elements + ] if inplace: self.__init__(elements=new_elements) else: return type(self)(elements=new_elements) def vertical_shift(self, delta, inplace=True): - new_elements = [e.vertical_shift(delta, inplace=False) for e in self.elements] + new_elements = [ + e.vertical_shift(delta, inplace=False) for e in self.elements + ] if inplace: self.__init__(elements=new_elements) else: @@ -181,7 +223,9 @@ def q_intercept(self): class Demand(Affine): - def __init__(self, intercept=None, slope=None, elements=None, inverse = True): + def __init__( + self, intercept=None, slope=None, elements=None, inverse=True + ): """ Initializes a Demand curve object. """ @@ -191,11 +235,12 @@ def __init__(self, intercept=None, slope=None, elements=None, inverse = True): # Warn about perfectly elastic segments if self.has_perfectly_elastic_segment: warnings.warn( - f"Created perfectly elastic demand curve. " - f"Note: Due to current implementation limitations, this curve " - f"cannot be used with Equilibrium or combined with other curves. " - f"The economics are valid, but the software support is incomplete.", - UserWarning + "Created perfectly elastic demand curve. " + "Note: Due to current implementation limitations, this curve " + "cannot be used with Equilibrium or combined with other " + "curves. The economics are valid, but the software support is " + "incomplete.", + UserWarning, ) def _check_slope(self): @@ -207,9 +252,10 @@ def _check_slope(self): for intercept in self.intercept: if intercept <= 0: raise Exception( - f"Demand curve intercept must be positive (got {intercept}). " - f"A demand curve represents willingness to pay, so the price intercept " - f"should be positive." + "Demand curve intercept must be positive " + f"(got {intercept}). " + "A demand curve represents willingness to pay, so " + "the price intercept should be positive." ) if not self.has_perfectly_elastic_segment: @@ -218,7 +264,8 @@ def _check_slope(self): def q(self, p): """ - Calculate quantity demanded at price p, handling perfectly elastic segments. + Calculate quantity demanded at price p, handling perfectly elastic + segments. For horizontal demand at price P*: - q(P*) = 0 (with warning about indeterminacy) @@ -232,9 +279,11 @@ def q(self, p): p_star = element.intercept if np.isclose(p, p_star): warnings.warn( - f"Quantity demanded is indeterminate at P={p_star} for perfectly elastic demand. " - f"Returning np.inf as a placeholder (actual quantity is indeterminate).", - UserWarning + f"Quantity demanded is indeterminate at P={p_star}" + " for perfectly elastic demand. " + "Returning np.inf as a placeholder " + "(actual quantity is indeterminate).", + UserWarning, ) return np.inf elif p > p_star: @@ -245,7 +294,7 @@ def q(self, p): # Default behavior for non-horizontal curves return super().q(p) - def consumer_surplus(self, p, q = None): + def consumer_surplus(self, p, q=None): return self.surplus(p, q) def total_revenue(self): @@ -259,6 +308,7 @@ def marginal_revenue(self): def __and__(self, other): """Create a Market from intersection with Supply using & operator.""" from .equilibrium import Market + if isinstance(other, Supply): return Market(demand=self, supply=other) else: @@ -267,7 +317,9 @@ def __and__(self, other): class Supply(Affine): - def __init__(self, intercept=None, slope=None, elements=None, inverse=True): + def __init__( + self, intercept=None, slope=None, elements=None, inverse=True + ): """ Initializes a Supply curve object. """ @@ -277,11 +329,12 @@ def __init__(self, intercept=None, slope=None, elements=None, inverse=True): # Warn about perfectly elastic segments if self.has_perfectly_elastic_segment: warnings.warn( - f"Created perfectly elastic supply curve. " - f"Note: Due to current implementation limitations, this curve " - f"cannot be used with Equilibrium or combined with other curves. " - f"The economics are valid, but the software support is incomplete.", - UserWarning + "Created perfectly elastic supply curve. " + "Note: Due to current implementation limitations, this curve " + "cannot be used with Equilibrium or combined with other " + "curves. The economics are valid, but the software support is " + "incomplete.", + UserWarning, ) def _check_slope(self): @@ -294,7 +347,8 @@ def _check_slope(self): def q(self, p): """ - Calculate quantity supplied at price p, handling perfectly elastic segments. + Calculate quantity supplied at price p, handling perfectly elastic + segments. For horizontal supply at price P*: - q(P*) = 0 (with warning about indeterminacy) @@ -308,9 +362,11 @@ def q(self, p): p_star = element.intercept if np.isclose(p, p_star): warnings.warn( - f"Quantity supplied is indeterminate at P={p_star} for perfectly elastic supply. " - f"Returning np.inf as a placeholder (actual quantity is indeterminate).", - UserWarning + f"Quantity supplied is indeterminate at P={p_star}" + " for perfectly elastic supply. " + "Returning np.inf as a placeholder (actual " + "quantity is indeterminate).", + UserWarning, ) return np.inf elif p > p_star: @@ -321,12 +377,13 @@ def q(self, p): # Default behavior for non-horizontal curves return super().q(p) - def producer_surplus(self, p, q = None): + def producer_surplus(self, p, q=None): return -self.surplus(p, q) def __and__(self, other): """Create a Market from intersection with Demand using & operator.""" from .equilibrium import Market + if isinstance(other, Demand): return Market(demand=other, supply=self) else: @@ -335,26 +392,37 @@ def __and__(self, other): class Constraint(BaseAffine): - def __init__(self, p1, p2, endowment=1, name1=None, name2=None, elements=None, inverse=True): - ''' + def __init__( + self, + p1, + p2, + endowment=1, + name1=None, + name2=None, + elements=None, + inverse=True, + ): + """ Incomplete. - ''' + """ if elements is None: - slope = -p1/p2 - intercept = endowment/p2 + slope = -p1 / p2 + intercept = endowment / p2 super().__init__(intercept, slope, elements, inverse) else: super().__init__(None, None, elements, inverse) class PPF(BaseAffine): - ''' + """ Production possibilities frontier. - ''' + """ - def __init__(self, intercept=None, slope=None, elements=None, inverse=True): - ''' + def __init__( + self, intercept=None, slope=None, elements=None, inverse=True + ): + """ Initializes a PPF object with given slope and intercept or elements. Parameters @@ -364,32 +432,33 @@ def __init__(self, intercept=None, slope=None, elements=None, inverse=True): slope : float or list of float, optional The slope(s) of the elements. elements : list of AffineElement, optional - A list of AffineElements whose horizontal sum defines the PPF. + A list of AffineElements whose horizontal sum defines the + PPF. inverse : bool, optional - When inverse is True, it is assumed that equations are in the form P(Q). + When inverse is True, it is assumed that equations are in the form + P(Q). Raises ------ ValueError If the lengths of `slope` and `intercept` do not match. - ''' + """ super().__init__(intercept, slope, elements, inverse) self._check_slope() self.pieces = ppf_sum(*self.elements) - def _check_slope(self): for slope in self.slope: if slope >= 0 or np.isinf(slope): raise PPFError("Upward-sloping or infinite-slope PPF.") - def __add__(self, other): elements = self.elements + other.elements return type(self)(elements=elements) def __call__(self, x): - """Return the quantity of the second good for ``x`` units of the first.""" + """Return the quantity of the second good for ``x`` units of the + first.""" for piece in self.pieces: if piece: a, b = piece._domain @@ -398,14 +467,18 @@ def __call__(self, x): return np.nan def horizontal_shift(self, delta, inplace=True): - new_elements = [e.horizontal_shift(delta, inplace=False) for e in self.elements] + new_elements = [ + e.horizontal_shift(delta, inplace=False) for e in self.elements + ] if inplace: self.__init__(elements=new_elements) return self return type(self)(elements=new_elements) def vertical_shift(self, delta, inplace=True): - new_elements = [e.vertical_shift(delta, inplace=False) for e in self.elements] + new_elements = [ + e.vertical_shift(delta, inplace=False) for e in self.elements + ] if inplace: self.__init__(elements=new_elements) return self @@ -427,39 +500,57 @@ def __mul__(self, scalar): def __rmul__(self, scalar): return self.__mul__(scalar) - def plot(self, ax=None, set_lims=True, max_q=None, label=True, backend='mpl', **kwargs): - ''' + def plot( + self, + ax=None, + set_lims=True, + max_q=None, + label=True, + backend="mpl", + **kwargs, + ): + """ Plot the ppf. - ''' + """ - if backend == 'bokeh': + if backend == "bokeh": p = figure(width=400, height=400, tools="") - lines_data = {'xs': [], 'ys': [], 'label': []} + lines_data = {"xs": [], "ys": [], "label": []} for key, piece in enumerate(self.pieces): x0, x1 = piece._domain xx = np.linspace(x0, x1) yy = [piece(u) for u in xx] - lines_data['xs'].append(xx) - lines_data['ys'].append(yy) - lines_data['label'].append(f'Piece {key}') + lines_data["xs"].append(xx) + lines_data["ys"].append(yy) + lines_data["label"].append(f"Piece {key}") source = ColumnDataSource(data=lines_data) - p.multi_line(xs='xs', ys='ys', source=source, line_width=2) + p.multi_line(xs="xs", ys="ys", source=source, line_width=2) # Add HoverTool hover = HoverTool( - tooltips=[('', "@label")], - renderers=[p.renderers[-1]]) + tooltips=[("", "@label")], renderers=[p.renderers[-1]] + ) p.add_tools(hover) return p - elif backend == 'mpl': + elif backend == "mpl": if ax is None: fig, ax = plt.subplots() - param_names = ['color', 'linewidth', 'linestyle', 'lw', 'ls', 'marker', 'markersize'] - plot_dict = {key: kwargs[key] for key in kwargs if key in param_names} + param_names = [ + "color", + "linewidth", + "linestyle", + "lw", + "ls", + "marker", + "markersize", + ] + plot_dict = { + key: kwargs[key] for key in kwargs if key in param_names + } # Plot each element for piece in self.pieces: if piece: @@ -484,5 +575,3 @@ def plot(self, ax=None, set_lims=True, max_q=None, label=True, backend='mpl', ** update_axes_limits(ax) return ax - -