diff --git a/examples/headland_inversion/Makefile b/examples/headland_inversion/Makefile index 8938efe8f..f64469269 100644 --- a/examples/headland_inversion/Makefile +++ b/examples/headland_inversion/Makefile @@ -3,13 +3,16 @@ all: invert plot CASE ?= Uniform # Regions IndependentPointsScheme GradientReg HessianReg STATIONS = stationA stationB stationC stationD stationE stationF stationG PARALLEL = 1 +IP_INTERPOLATION ?= linear # linear cubic rbf +IP_RBF_KERNEL ?= thin_plate_spline +IP_RBF_SMOOTHING ?= 0.0 invert: - mpiexec --use-hwthread-cpus -np $(PARALLEL) python inverse_problem.py --case $(CASE) --no-consistency-test --no-taylor-test + mpiexec --use-hwthread-cpus -np $(PARALLEL) python inverse_problem.py --case $(CASE) --no-consistency-test --no-taylor-test --ip-interpolation $(IP_INTERPOLATION) --ip-rbf-kernel $(IP_RBF_KERNEL) --ip-rbf-smoothing $(IP_RBF_SMOOTHING) plot: for station in $(STATIONS); do \ - python3 plot_velocity_progress.py -s $$station --case $(CASE); \ + python3 plot_velocity_progress.py -s $$station --case $(CASE) --ip-interpolation $(IP_INTERPOLATION); \ done; \ clean: diff --git a/examples/headland_inversion/README.md b/examples/headland_inversion/README.md index 14b8fc9e8..ceca192b1 100644 --- a/examples/headland_inversion/README.md +++ b/examples/headland_inversion/README.md @@ -145,11 +145,20 @@ The independent point scheme approach works in the same way as the region-based function which tells us how the Manning field changes with respect to our input independent point values. The only thing we need to do is change the masks we generate. -Instead of masks with 0/1 values, we have masks which describe the contribution of each point to the rest of the -domain. Note that this will only work for linear interpolation, as we cannot generate static coefficients for non-linear -mappings (RBF, quadratic, cubic etc.). In those cases, we would need to 'annotate' the interpolation functions for -`pyadjoint` to track the gradient through. We can generate a mapping in the same way to force a smooth surface, but it -would not be true RBF/quadratic/cubic interpolation. +Instead of masks with 0/1 values, we have a mapping which describes the contribution of each point to the rest of the +domain. Linear interpolation uses fixed Firedrake basis fields. Cubic and radial-basis interpolation use Thetis' +independent-point external operator, which evaluates the SciPy interpolation and provides the adjoint vector-Jacobian +product required by `pyadjoint`. + +```sh +source ~/firedrake/bin/activate +make invert CASE=IndependentPointsScheme IP_INTERPOLATION=linear +make invert CASE=IndependentPointsScheme IP_INTERPOLATION=cubic +make invert CASE=IndependentPointsScheme IP_INTERPOLATION=rbf +``` + +The RBF mode uses SciPy's `RBFInterpolator`. The default kernel is `thin_plate_spline` with zero smoothing; these can be +changed through `IP_RBF_KERNEL` and `IP_RBF_SMOOTHING`. ## Post-processing diff --git a/examples/headland_inversion/inverse_problem.py b/examples/headland_inversion/inverse_problem.py index b0236c14c..f2a8e1b40 100644 --- a/examples/headland_inversion/inverse_problem.py +++ b/examples/headland_inversion/inverse_problem.py @@ -3,7 +3,6 @@ from firedrake import * from firedrake.adjoint import * from model_config import construct_solver -from scipy.interpolate import LinearNDInterpolator, NearestNDInterpolator import h5py import numpy as np import argparse @@ -27,10 +26,21 @@ help='Skip consistency test') parser.add_argument('--no-taylor-test', action='store_true', help='Skip Taylor test') +parser.add_argument('--ip-interpolation', + help='Interpolation method for the independent point scheme', + choices=['linear', 'cubic', 'rbf'], + default='linear') +parser.add_argument('--ip-rbf-kernel', + help='SciPy RBFInterpolator kernel for the independent point scheme', + default='thin_plate_spline') +parser.add_argument('--ip-rbf-smoothing', type=float, + help='SciPy RBFInterpolator smoothing parameter for the independent point scheme', + default=0.0) args = parser.parse_args() do_consistency_test = not args.no_consistency_test do_taylor_test = not args.no_taylor_test no_exports = os.getenv('THETIS_REGRESSION_TEST') is not None +ip_interpolation = args.ip_interpolation case_to_output_dir = { 'Uniform': 'uniform_friction', @@ -44,6 +54,8 @@ pwd = os.path.abspath(os.path.dirname(__file__)) output_dir_forward = os.path.join(pwd, 'outputs', 'outputs_forward') output_dir_invert = os.path.join(pwd, 'outputs', 'outputs_inverse', case_to_output_dir[selected_case]) +if selected_case == 'IndependentPointsScheme' and ip_interpolation != 'linear': + output_dir_invert += f'_{ip_interpolation}' continue_annotation() @@ -71,7 +83,7 @@ local_N = coordinates.shape[0] N = mesh2d.comm.allreduce(local_N, MPI.SUM) # allreduce sums the local numbers to get the total number of coordinates -masks, M, m_true = None, 0, [] +masks, point_mapping, M, m_true = None, None, 0, [] # Create a FunctionSpace on the mesh (corresponds to Manning) V = get_functionspace(mesh2d, 'CG', 1) @@ -104,6 +116,7 @@ for m_, mask_ in zip(m_true, masks): manning_2d += m_ * mask_ elif selected_case == 'IndependentPointsScheme': + print_output(f'Using {ip_interpolation} independent point interpolation') # Define our values for n points = np.array([ [0, 0], [0, 0.5], [0, 1], [0.5, 0], [1, 0], [1, 0.5], [1, 1], [0.5, 0.5], @@ -116,24 +129,11 @@ m_true = [domain_constant(0.03 - 0.0005 * i, mesh2d) for i in range(len(points))] M = len(m_true) - linear_interpolator = LinearNDInterpolator(points, np.eye(len(points))) - nearest_interpolator = NearestNDInterpolator(points, np.eye(len(points))) - - # Apply the interpolators to the mesh coordinates - linear_coefficients = linear_interpolator(coordinates) - nan_mask = np.isnan(linear_coefficients).any(axis=1) - linear_coefficients[nan_mask] = nearest_interpolator(coordinates[nan_mask]) - - # Create Function objects to store the coefficients - masks = [Function(V) for _ in range(len(points))] - - # Assign the linear coefficients to the masks - for i, mask in enumerate(masks): - mask.dat.data[:] = linear_coefficients[:, i] - - manning_2d.assign(0) - for m_, mask_ in zip(m_true, masks): - manning_2d += m_ * mask_ + point_mapping = inversion_tools.IndependentPointControlMapping( + V, points, method=ip_interpolation, + rbf_kernel=args.ip_rbf_kernel, rbf_smoothing=args.ip_rbf_smoothing) + masks = point_mapping.masks + point_mapping.assign(manning_2d, m_true) elif selected_case in ('GradientReg', 'HessianReg'): pass else: @@ -245,12 +245,16 @@ manning_const = domain_constant(0.03, mesh2d, name='Manning') manning_2d.project(manning_const) inv_manager.add_control(manning_const) -elif selected_case in ('Regions', 'IndependentPointsScheme'): +elif selected_case == 'Regions': m_values = [domain_constant(0.03 - 0.0005 * i, mesh2d) for i in range(M)] manning_2d.assign(0) for m_, mask_ in zip(m_values, masks): manning_2d += m_ * mask_ inv_manager.add_control(m_values, mappings=masks) +elif selected_case == 'IndependentPointsScheme': + m_values = [domain_constant(0.03 - 0.0005 * i, mesh2d) for i in range(M)] + point_mapping.assign(manning_2d, m_values) + inv_manager.add_control(m_values, mappings=point_mapping) else: manning_2d.assign(0.04) inv_manager.add_control(manning_2d) @@ -295,7 +299,7 @@ manning_2d.assign(domain_constant(inv_manager.m_list[-1], mesh2d)) if not no_exports: VTKFile(os.path.join(options.output_directory, 'manning_optimised.pvd')).write(manning_2d) - elif selected_case == 'Regions' or selected_case == 'IndependentPointsScheme': + elif selected_case == 'Regions': P1_2d = get_functionspace(mesh2d, 'CG', 1) manning_2d = Function(P1_2d, name='manning2d') manning_2d.assign(0) @@ -303,6 +307,10 @@ manning_2d += m_ * mask_ if not no_exports: VTKFile(os.path.join(options.output_directory, 'manning_optimised.pvd')).write(manning_2d) + elif selected_case == 'IndependentPointsScheme': + manning_2d = point_mapping.project(inv_manager.m_list, name='manning2d') + if not no_exports: + VTKFile(os.path.join(options.output_directory, 'manning_optimised.pvd')).write(manning_2d) else: name = cc.name() oc.rename(name) diff --git a/examples/headland_inversion/plot_velocity_progress.py b/examples/headland_inversion/plot_velocity_progress.py index daac0c01f..ca2446709 100644 --- a/examples/headland_inversion/plot_velocity_progress.py +++ b/examples/headland_inversion/plot_velocity_progress.py @@ -20,6 +20,10 @@ choices=['Uniform', 'Regions', 'IndependentPointsScheme', 'GradientReg', 'HessianReg'], default=['IndependentPointsScheme'], ) +parser.add_argument('--ip-interpolation', + help='Interpolation method for the independent point scheme', + choices=['linear', 'cubic', 'rbf'], + default='linear') args = parser.parse_args() station_names = sorted(args.station) station_str = '-'.join(station_names) @@ -35,6 +39,8 @@ selected_case = args.case[0] output_dir_forward = os.path.join('outputs', 'outputs_forward') output_dir_invert = os.path.join('outputs', 'outputs_inverse', case_to_output_dir[selected_case]) +if selected_case == 'IndependentPointsScheme' and args.ip_interpolation != 'linear': + output_dir_invert += f'_{args.ip_interpolation}' # --- Loop through stations --- for sta in station_names: diff --git a/test_adjoint/test_independent_point_mapping.py b/test_adjoint/test_independent_point_mapping.py new file mode 100644 index 000000000..ae23ebdf8 --- /dev/null +++ b/test_adjoint/test_independent_point_mapping.py @@ -0,0 +1,54 @@ +import numpy +import pytest + +from firedrake import * +from firedrake.adjoint import * +from pyadjoint import get_working_tape + +from thetis.inversion_tools import IndependentPointControlMapping + + +def make_real_controls(mesh, values, name): + R = FunctionSpace(mesh, 'R', 0) + controls = [] + for i, value in enumerate(values): + control = Function(R, name=f'{name}_{i:02d}') + control.assign(float(value)) + controls.append(control) + return controls + + +@pytest.mark.parametrize('method', ['linear', 'cubic', 'rbf']) +def test_independent_point_mapping_gradient(method): + get_working_tape().clear_tape() + continue_annotation() + + mesh = UnitSquareMesh(8, 8) + V = FunctionSpace(mesh, 'CG', 1) + x, y = SpatialCoordinate(mesh) + target = Function(V).interpolate(0.022 + 0.004*sin(pi*x)*cos(pi*y) + 0.002*x) + + points = numpy.array([ + [0.0, 0.0], + [1.0, 0.0], + [0.0, 1.0], + [1.0, 1.0], + [0.2, 0.25], + [0.75, 0.2], + [0.25, 0.75], + [0.8, 0.8], + [0.5, 0.5], + ]) + values = numpy.array([0.020, 0.024, 0.023, 0.021, 0.026, 0.019, 0.025, 0.022, 0.027]) + direction_values = numpy.array([0.30, -0.20, 0.15, -0.10, 0.25, -0.35, 0.40, -0.30, 0.20]) + + controls = make_real_controls(mesh, values, f'{method}_control') + directions = make_real_controls(mesh, direction_values, f'{method}_direction') + mapping = IndependentPointControlMapping(V, points, method=method) + + field = Function(V, name=f'{method}_field') + mapping.assign(field, controls) + J = assemble(0.5*(field - target)**2*dx) + reduced_functional = ReducedFunctional(J, [Control(c) for c in controls]) + + assert taylor_test(reduced_functional, controls, directions) > 1.9 diff --git a/thetis/inversion_tools.py b/thetis/inversion_tools.py index 79bc60b95..0848b286f 100644 --- a/thetis/inversion_tools.py +++ b/thetis/inversion_tools.py @@ -1,5 +1,6 @@ import firedrake as fd from firedrake.adjoint import * +from firedrake.external_operators import AbstractExternalOperator, assemble_method import ufl from .configuration import FrozenHasTraits from .solver2d import FlowSolver2d @@ -11,7 +12,10 @@ import abc import numpy import h5py -from scipy.interpolate import interp1d +from scipy.interpolate import ( + interp1d, LinearNDInterpolator, NearestNDInterpolator, + CloughTocher2DInterpolator, RBFInterpolator, +) import time as time_mod from pyadjoint.optimization.optimization import SciPyConvergenceError import os @@ -44,6 +48,211 @@ def message_str(self, cost_value): return f"Cost function value: {cost_value}" +def _function_space_node_coordinates(function_space): + return function_space.mesh().coordinates.dat.data_ro + + +def _interpolate_independent_point_values(function_space, points, values, method, + rbf_kernel='thin_plate_spline', rbf_smoothing=0.0): + """Evaluate an independent-point interpolation on local mesh nodes.""" + coordinates = _function_space_node_coordinates(function_space) + if method == 'linear': + interpolator = LinearNDInterpolator(points, values) + nearest_interpolator = NearestNDInterpolator(points, values) + interpolated = interpolator(coordinates) + nan_mask = numpy.isnan(interpolated) + if interpolated.ndim > 1: + nan_mask = nan_mask.any(axis=1) + if numpy.any(nan_mask): + interpolated[nan_mask] = nearest_interpolator(coordinates[nan_mask]) + return interpolated + elif method == 'cubic': + interpolator = CloughTocher2DInterpolator(points, values) + nearest_interpolator = NearestNDInterpolator(points, values) + interpolated = interpolator(coordinates) + nan_mask = numpy.isnan(interpolated) + if interpolated.ndim > 1: + nan_mask = nan_mask.any(axis=1) + if numpy.any(nan_mask): + interpolated[nan_mask] = nearest_interpolator(coordinates[nan_mask]) + return interpolated + elif method == 'rbf': + interpolator = RBFInterpolator(points, values, kernel=rbf_kernel, smoothing=rbf_smoothing) + return interpolator(coordinates) + else: + raise ValueError(f"Unsupported independent point interpolation method '{method}'") + + +def _independent_point_coefficients(function_space, points, method, + rbf_kernel='thin_plate_spline', rbf_smoothing=0.0): + values = numpy.eye(len(points)) + coefficients = _interpolate_independent_point_values( + function_space, points, values, method, rbf_kernel=rbf_kernel, rbf_smoothing=rbf_smoothing) + return tuple(tuple(float(v) for v in row) for row in coefficients) + + +class IndependentPointInterpolator(AbstractExternalOperator): + """ + External operator mapping independent point controls to a mesh field. + + The forward pass evaluates the requested SciPy interpolation. The adjoint + action uses precomputed interpolation basis fields to provide pyadjoint with + the vector-Jacobian product with respect to each scalar point value. + """ + + def __init__(self, *operands, function_space, derivatives=None, argument_slots=(), operator_data): + AbstractExternalOperator.__init__( + self, *operands, function_space=function_space, derivatives=derivatives, + argument_slots=argument_slots, operator_data=operator_data) + + @property + def points(self): + return numpy.asarray(self.operator_data['points']) + + @property + def method(self): + return self.operator_data['method'] + + @property + def rbf_kernel(self): + return self.operator_data['rbf_kernel'] + + @property + def rbf_smoothing(self): + return self.operator_data['rbf_smoothing'] + + @property + def cubic_derivative_step(self): + return self.operator_data['cubic_derivative_step'] + + @property + def interpolation_coefficients(self): + return numpy.asarray(self.operator_data['coefficients']) + + def control_values(self): + return numpy.array([float(control.dat.data_ro[0]) for control in self.ufl_operands]) + + def coefficient_function(self, index): + coefficient = fd.Function(self.function_space(), name=f'independent_point_basis_{index:02d}') + if self.method == 'cubic': + values = self.control_values() + step = self.cubic_derivative_step + values_plus = values.copy() + values_minus = values.copy() + values_plus[index] += step + values_minus[index] -= step + interp_plus = _interpolate_independent_point_values( + self.function_space(), self.points, values_plus, self.method, + rbf_kernel=self.rbf_kernel, rbf_smoothing=self.rbf_smoothing) + interp_minus = _interpolate_independent_point_values( + self.function_space(), self.points, values_minus, self.method, + rbf_kernel=self.rbf_kernel, rbf_smoothing=self.rbf_smoothing) + coefficient.dat.data_wo[:] = (interp_plus - interp_minus) / (2.0 * step) + else: + coefficient.dat.data_wo[:] = self.interpolation_coefficients[:, index] + return coefficient + + @assemble_method(0, (0,)) + def assemble_operator(self, *args, **kwargs): + field = fd.Function(self.function_space(), name='independent_point_field') + field.dat.data_wo[:] = _interpolate_independent_point_values( + self.function_space(), self.points, self.control_values(), self.method, + rbf_kernel=self.rbf_kernel, rbf_smoothing=self.rbf_smoothing) + return field + + @assemble_method(1, (0, None)) + def assemble_jacobian_action(self, *args, **kwargs): + index, = [j for j, d in enumerate(self.derivatives) if d == 1] + direction = self.argument_slots()[-1] + return fd.Function(self.function_space()).assign( + float(direction.dat.data_ro[0]) * self.coefficient_function(index)) + + @assemble_method(1, (None, 0)) + def assemble_jacobian_adjoint_action(self, *args, **kwargs): + index, = [j for j, d in enumerate(self.derivatives) if d == 1] + output_adjoint = self.argument_slots()[0] + coefficient = self.coefficient_function(index) + + with output_adjoint.dat.vec_ro as y, coefficient.dat.vec_ro as basis: + sensitivity = y.dot(basis) + + control_space = self.ufl_operands[index].function_space() + result = fd.Cofunction(control_space.dual()) + result.dat.data_wo[0] = sensitivity + return result + + +class IndependentPointControlMapping: + """ + Maps scalar independent point controls to a Firedrake field. + + ``linear`` mappings use fixed Firedrake basis Functions. ``cubic`` and + ``rbf`` mappings use :class:`IndependentPointInterpolator` so that + pyadjoint can differentiate through SciPy interpolation calls. + """ + + def __init__(self, function_space, points, method='linear', + rbf_kernel='thin_plate_spline', rbf_smoothing=0.0, + cubic_derivative_step=1.0e-6): + if function_space.value_shape != (): + raise ValueError("IndependentPointControlMapping expects a scalar FunctionSpace") + self.function_space = function_space + self.points = numpy.asarray(points, dtype=float) + self.method = method.lower() + self.rbf_kernel = rbf_kernel + self.rbf_smoothing = float(rbf_smoothing) + self.cubic_derivative_step = float(cubic_derivative_step) + if self.method not in ('linear', 'cubic', 'rbf'): + raise ValueError(f"Unsupported independent point interpolation method '{method}'") + + self.coefficients = _independent_point_coefficients( + self.function_space, self.points, self.method, + rbf_kernel=self.rbf_kernel, rbf_smoothing=self.rbf_smoothing) + self.num_controls = len(self.points) + self.masks = self._build_basis_functions() if self.method == 'linear' else None + + def _build_basis_functions(self): + coefficients = numpy.asarray(self.coefficients) + masks = [] + for i in range(self.num_controls): + mask = fd.Function(self.function_space, name=f'independent_point_basis_{i:02d}') + mask.dat.data_wo[:] = coefficients[:, i] + masks.append(mask) + return masks + + @property + def operator_data(self): + return { + 'points': tuple(tuple(float(v) for v in point) for point in self.points), + 'method': self.method, + 'rbf_kernel': self.rbf_kernel, + 'rbf_smoothing': self.rbf_smoothing, + 'cubic_derivative_step': self.cubic_derivative_step, + 'coefficients': self.coefficients, + } + + def operator(self, controls): + return IndependentPointInterpolator(*controls, function_space=self.function_space, + operator_data=self.operator_data) + + def project(self, controls, name='control'): + if len(controls) != self.num_controls: + raise ValueError(f"Expected {self.num_controls} controls, got {len(controls)}") + if self.method == 'linear': + field = fd.Function(self.function_space, name=name) + field.assign(0.0) + for control, mask in zip(controls, self.masks): + field.assign(field + control * mask) + else: + field = fd.assemble(self.operator(controls)) + field.rename(name) + return field + + def assign(self, target, controls): + target.assign(self.project(controls, name=target.name())) + return target + + class ControlManager: """ Handles an individual control (spatially varying Function, masked combination of Functions, @@ -60,6 +269,7 @@ def __init__(self, control, output_dir, index, no_exports=False, mappings=None): self.mappings = mappings self.is_masked_combination = mappings is not None self.is_field = False + self.num_controls = len(self.controls) # Exporters self.vtk_file = None @@ -71,8 +281,14 @@ def __init__(self, control, output_dir, index, no_exports=False, mappings=None): for c in self.controls: if not isinstance(c, fd.Function): raise ValueError("All masked combination controls must be Functions") - self.mesh = self.controls[0].function_space().mesh() - self.projection_space = fd.FunctionSpace(self.mesh, "CG", 1, variant="equispaced") + if hasattr(self.mappings, 'project'): + self.projection_space = self.mappings.function_space + self.mesh = self.projection_space.mesh() + self.num_controls = self.mappings.num_controls + else: + self.mesh = self.controls[0].function_space().mesh() + self.projection_space = fd.FunctionSpace(self.mesh, "CG", 1, variant="equispaced") + self.num_controls = len(self.mappings) self.is_field = False elif self.controls[0].function_space().ufl_element().family() == "Real": # domain_constant (uniform value Function in the real space) @@ -97,12 +313,16 @@ def __init__(self, control, output_dir, index, no_exports=False, mappings=None): def project_control(self, updated_controls): """Project masked combination or domain_constant to CG1 field for export.""" - field = fd.Function(self.projection_space, name="control") - field.assign(0) if self.is_masked_combination: - for m_, mask_ in zip(updated_controls, self.mappings): - field += m_ * mask_ + if hasattr(self.mappings, 'project'): + field = self.mappings.project(updated_controls, name="control") + else: + field = fd.Function(self.projection_space, name="control") + field.assign(0) + for m_, mask_ in zip(updated_controls, self.mappings): + field += m_ * mask_ else: + field = fd.Function(self.projection_space, name="control") field.project(updated_controls[0]) # domain_constant return field @@ -139,12 +359,16 @@ def export_gradient(self, updated_gradient): else: if not isinstance(updated_gradient, list): updated_gradient = [updated_gradient] - gradient = fd.Function(self.projection_space, name="Gradient") - gradient.assign(0) if self.is_masked_combination: - for g_, mask_ in zip(updated_gradient, self.mappings): - gradient += g_ * mask_ + if hasattr(self.mappings, 'project'): + gradient = self.mappings.project(updated_gradient, name="Gradient") + else: + gradient = fd.Function(self.projection_space, name="Gradient") + gradient.assign(0) + for g_, mask_ in zip(updated_gradient, self.mappings): + gradient += g_ * mask_ else: # domain_constant + gradient = fd.Function(self.projection_space, name="Gradient") gradient.project(updated_gradient[0]) self.gradient_vtk_file.write(gradient) @@ -217,9 +441,9 @@ def add_control(self, controls, mappings=None): ---------- controls : fd.Function or list/tuple of fd.Function The Function(s) representing the control parameters. - mappings : list of fd.Function, optional - Masks for multi-control cases. If provided, `controls` must be a list of Functions - matching the masks. + mappings : list of fd.Function or object with a ``project`` method, optional + Masks or a mapping object for multi-control cases. If provided, + `controls` must be a list of Functions matching the mapping. Notes ----- @@ -230,6 +454,9 @@ def add_control(self, controls, mappings=None): but changing it mid-solve may lead to incorrect gradients or mis-evaluation of the ReducedFunctional. - Masked combinations are supported via lists of Functions combined with masks. + Mapping objects such as :class:`IndependentPointControlMapping` can + also be used when the control-to-field operation needs custom + adjoint handling. """ if not isinstance(controls, (list, tuple)): controls = [controls] @@ -335,7 +562,7 @@ def update_progress(self): if not self.no_exports: ref_index = 0 for cm in self.control_managers: - num_controls = len(cm.mappings) if cm.mappings is not None else 1 + num_controls = cm.num_controls controls_slice = self.m_list[ref_index: ref_index + num_controls] cm.export(controls_slice) gradient_slice = self.dJdm_list[ref_index: ref_index + num_controls]