diff --git a/optika/__init__.py b/optika/__init__.py index 67ab71b..06a2e53 100644 --- a/optika/__init__.py +++ b/optika/__init__.py @@ -9,6 +9,7 @@ from . import vectors from . import targets from . import rays +from . import wavefields from . import propagators from . import metrology from . import sags @@ -32,6 +33,7 @@ "vectors", "targets", "rays", + "wavefields", "propagators", "metrology", "sags", diff --git a/optika/surfaces.py b/optika/surfaces.py index ae25f1d..124c563 100644 --- a/optika/surfaces.py +++ b/optika/surfaces.py @@ -7,6 +7,7 @@ from typing import TypeVar, Generic import abc import dataclasses +import numpy as np import numpy.typing as npt import matplotlib.axes from astropy import units as u @@ -197,6 +198,293 @@ def propagate_rays( return rays_2 + def wavefield_samples( + self, + axis: tuple[str, str], + num: int | na.AbstractCartesian2dVectorArray = 10_000, + seed: None | int = None, + bound: ( + None + | tuple[ + na.AbstractCartesian2dVectorArray, + na.AbstractCartesian2dVectorArray, + ] + ) = None, + ) -> optika.wavefields.WavefieldVectorArray: + """ + Stratified-random samples of this surface's aperture, in local surface + coordinates, with unit amplitude on unvignetted samples and zero + amplitude elsewhere. + + The :attr:`~optika.wavefields.WavefieldVectorArray.wavelength` and + :attr:`~optika.wavefields.WavefieldVectorArray.index_refraction` + fields are left at their defaults for the caller to fill in. + + Parameters + ---------- + axis + The two logical axes of the new grid of samples. + num + The approximate total number of samples, + or the number of samples along each axis. + seed + An optional seed for the random number generator. + bound + The lower and upper corners of the region to sample, in local + surface coordinates. + If :obj:`None` (the default), the bounding box of the aperture is + used. + This parameter is required for surfaces with inverted apertures + (obscurations), since the aperture of such surfaces does not + bound the beam. + """ + aperture = self.aperture + if aperture is None: + raise ValueError( + f"surface {self.name!r} must have an aperture to be sampled " + f"by a wavefield." + ) + + if bound is None: + if aperture.inverted: + raise ValueError( + f"the aperture of surface {self.name!r} is inverted, " + f"so the sampling region must be given using the `bound` " + f"parameter." + ) + lower = aperture.bound_lower.xy + upper = aperture.bound_upper.xy + else: + lower, upper = bound + + if not na.unit_normalized(lower).is_equivalent(u.mm): + raise ValueError( + f"the aperture of surface {self.name!r} must be expressed in " + f"physical units to be sampled by a wavefield." + ) + + if not isinstance(num, na.AbstractCartesian2dVectorArray): + num_x = int(np.round(np.sqrt(num))) + num = na.Cartesian2dVectorArray(num_x, num_x) + + grid = na.Cartesian2dVectorStratifiedRandomSpace( + start=lower, + stop=upper, + axis=na.Cartesian2dVectorArray(*axis), + num=num, + seed=seed, + ).explicit + + sag = self.sag + if sag is None: + sag = optika.sags.NoSag() + + position = na.Cartesian3dVectorArray( + x=grid.x, + y=grid.y, + z=0 * na.unit_normalized(grid.x), + ) + position.z = sag(position) + + where = aperture(position) + + normal = sag.normal(position) + + area = (upper.x - lower.x) * (upper.y - lower.y) / (num.x * num.y) + area = area / np.abs(normal.z) + + return optika.wavefields.WavefieldVectorArray( + position=position, + amplitude=np.where(where, 1 + 0j, 0j), + normal=normal, + area=area, + unvignetted=where, + ) + + def _interact_wavefield( + self, + wavefield: optika.wavefields.AbstractWavefieldVectorArray, + direction: na.AbstractCartesian3dVectorArray, + ) -> optika.wavefields.WavefieldVectorArray: + """ + Apply this surface's material and rulings to a wavefield sampled on + this surface. + + The amplitude is multiplied by the square root of the material + efficiency (and of the ruling efficiency, if rulings are present), + the ruling phase function is applied, + and the index of refraction is updated to that of the new medium. + + Parameters + ---------- + wavefield + A wavefield sampled on this surface, expressed in system + coordinates. + direction + The effective incidence direction of the light at each sample + point, expressed in system coordinates. + """ + material = self.material + if material is None: + material = optika.materials.Vacuum() + + sag = self.sag + if sag is None: + sag = optika.sags.NoSag() + + rulings = self.rulings + transformation = self.transformation + + rays = optika.rays.RayVectorArray( + wavelength=wavefield.wavelength, + position=wavefield.position, + direction=direction, + index_refraction=wavefield.index_refraction, + ) + if transformation is not None: + rays = transformation.inverse(rays) + + normal = sag.normal(rays.position) + + amplitude = wavefield.amplitude + + if rulings is not None: + spacing = rulings.spacing_ + position = rays.position + + # The phase shift imparted by the rulings is + # 2 pi m N(r), where N(r) is the groove-counting function whose + # gradient is the local groove density. + # Integrate the groove density along the chord from the local + # origin to each sample point using Gauss-Legendre quadrature, + # which is exact for constant ruling spacing. + nodes, weights = np.polynomial.legendre.leggauss(32) + nodes = (nodes + 1) / 2 + weights = weights / 2 + + grooves = 0 + for node, weight in zip(nodes, weights): + kappa = spacing(node * position, normal) + density = (position @ kappa) / np.square(kappa.length) + grooves = grooves + weight * density + grooves = grooves.to(u.dimensionless_unscaled).value + + # The same orientation convention as + # optika.rulings.incident_effective. + sign = np.sign(na.as_named_array(rays.direction @ normal).value) + + phase = 2 * np.pi * rulings.diffraction_order * grooves + amplitude = amplitude * np.exp(1j * sign * phase) + amplitude = amplitude * np.sqrt(rulings.efficiency(rays, normal)) + + efficiency = material.efficiency(rays, normal) + + return dataclasses.replace( + wavefield, + amplitude=amplitude * np.sqrt(efficiency), + index_refraction=material.index_refraction(rays), + ) + + def propagate_wavefield( + self, + wavefield: optika.wavefields.AbstractWavefieldVectorArray, + axis: tuple[str, str], + axis_new: tuple[str, str], + num: int | na.AbstractCartesian2dVectorArray = 10_000, + chunk_size: int = 1024, + seed: None | int = None, + bound: ( + None + | tuple[ + na.AbstractCartesian2dVectorArray, + na.AbstractCartesian2dVectorArray, + ] + ) = None, + ) -> optika.wavefields.WavefieldVectorArray: + """ + Propagate the given wavefield to this surface by evaluating the + Rayleigh-Sommerfeld diffraction integral at a new set of stratified + random samples of this surface's aperture. + + Both the given and the resulting wavefield are expressed in system + coordinates. + + Parameters + ---------- + wavefield + The wavefield sampled on the previous surface, expressed in + system coordinates. + axis + The two logical axes of the samples of the given wavefield, + which are summed over by the diffraction integral. + axis_new + The two logical axes of the new samples on this surface. + Must be distinct from `axis`. + num + The approximate total number of samples on this surface, + or the number of samples along each axis. + chunk_size + The maximum number of destination points considered + simultaneously by the diffraction integral. + seed + An optional seed for the random number generator. + bound + The lower and upper corners of the region of this surface to + sample, in local surface coordinates. + If :obj:`None` (the default), the bounding box of this surface's + aperture is used, except for inverted apertures (obscurations), + where the footprint of the incoming wavefield is used instead. + """ + for ax in axis_new: + if ax in axis: + raise ValueError( + f"`axis_new`, {axis_new}, must be distinct from " f"`axis`, {axis}." + ) + + if (bound is None) and (self.aperture is not None): + if self.aperture.inverted: + # The aperture of an obscuration does not bound the beam, + # so sample the footprint of the incoming wavefield instead. + position_local = wavefield.position + if self.transformation is not None: + position_local = self.transformation.inverse(position_local) + position_local = position_local.xy + lower = position_local.min() + upper = position_local.max() + padding = (upper - lower) / 10 + bound = (lower - padding, upper + padding) + + target = self.wavefield_samples( + axis=axis_new, + num=num, + seed=seed, + bound=bound, + ) + + if self.transformation is not None: + target = self.transformation(target) + + amplitude = optika.wavefields.rayleigh_sommerfeld( + wavefield=wavefield, + position=target.position, + axis=axis, + chunk_size=chunk_size, + ) + + target.amplitude = target.amplitude * amplitude + target.wavelength = wavefield.wavelength + target.index_refraction = wavefield.index_refraction + + weight = np.square(np.abs(wavefield.amplitude)) + centroid = (wavefield.position * weight).sum(axis) / weight.sum(axis) + direction = target.position - centroid + direction = direction / direction.length + + return self._interact_wavefield( + wavefield=target, + direction=direction, + ) + def plot( self, ax: None | matplotlib.axes.Axes | na.ScalarArray[npt.NDArray] = None, diff --git a/optika/systems/_sequential.py b/optika/systems/_sequential.py index a0dbbbe..9da4dbf 100644 --- a/optika/systems/_sequential.py +++ b/optika/systems/_sequential.py @@ -746,6 +746,350 @@ def rayfunction_default(self) -> optika.rays.RayFunctionArray: """ return self.rayfunction() + def wavefield( + self, + wavelength: None | u.Quantity | na.AbstractScalar = None, + field: None | na.AbstractCartesian2dVectorArray = None, + position_sensor: None | na.AbstractCartesian2dVectorArray = None, + width_sensor: None | u.Quantity | na.AbstractScalar = None, + num: int | na.AbstractCartesian2dVectorArray | Sequence = 10_000, + num_sensor: int = 101, + axis: tuple[str, str] = ("wavefield_x", "wavefield_y"), + chunk_size: int = 1024, + normalized_field: bool = True, + seed: None | int = None, + bound: None | Sequence = None, + ) -> optika.wavefields.WavefieldFunctionArray: + r""" + Propagate a wavefield through this system using physical optics and + evaluate it on a grid of sensor positions. + + The wavefield is computed by sampling each surface's aperture with a + stratified random grid and evaluating the Rayleigh-Sommerfeld + diffraction integral surface by surface, + :func:`optika.wavefields.rayleigh_sommerfeld`. + + Only surfaces with an aperture participate in the propagation, + all other surfaces are ignored. + + Parameters + ---------- + wavelength + The vacuum wavelength of the wavefield. + If :obj:`None` (the default), ``self.grid_input.wavelength`` + will be used. + field + The field positions of the incident light, in either normalized + or physical units. + If :obj:`None` (the default), ``self.grid_input.field`` + will be used. + position_sensor + The sensor-local positions at which to evaluate the wavefield. + If :obj:`None` (the default), a grid of `num_sensor` positions + along each axis, spanning `width_sensor` and centered on the + centroid of a geometric raytrace, will be used. + width_sensor + The width of the automatic grid of sensor positions. + Only used if `position_sensor` is :obj:`None`. + num + The approximate number of samples on each surface. + If a sequence, it must have the same number of elements as the + number of surfaces with an aperture. + num_sensor + The number of sensor positions along each axis. + Only used if `position_sensor` is :obj:`None`. + axis + The two logical axes of the grid of sensor positions. + chunk_size + The maximum number of destination points considered + simultaneously by the diffraction integral. + See :func:`optika.wavefields.rayleigh_sommerfeld` for a + description of how this parameter bounds the memory usage. + normalized_field + A boolean flag indicating whether the `field` parameter is given + in normalized or physical units. + seed + An optional seed for the random number generators used to sample + each surface. + bound + An optional sequence of sampling regions, one for each surface + with an aperture, where each element is either :obj:`None` (use + the bounding box of the surface's aperture) or a tuple of the + lower and upper corners of the region in local surface + coordinates. + This is important for surfaces near an intermediate focus, + where the wavefield is concentrated in a region much smaller than + the aperture and would otherwise be unresolved by the samples. + + Notes + ----- + The brute-force Rayleigh-Sommerfeld integral is only well-sampled if + the phase of its integrand varies by less than about :math:`\pi` + between neighboring samples. + This is naturally satisfied when consecutive surfaces are close to + conjugate (each surface is near an image of the previous one, or near + a focus of the light leaving it), but it is violated for plane-to-plane + propagation with a large Fresnel number, such as between two + collimated-space apertures. + Undersampled propagation manifests as a speckle-like noise floor + rather than an error in the mean, so check convergence by increasing + `num`. + Surfaces that only mask the beam in collimated space (obscurations, + filters) are often better excluded from the wave propagation + (by setting their aperture to :obj:`None` in a copy of the system) + than included as an undersampled propagation step. + """ + if wavelength is None: + wavelength = self.grid_input.wavelength + if field is None: + field = self.grid_input.field + + grid = optika.vectors.ObjectVectorArray( + wavelength=wavelength, + field=field, + pupil=na.Cartesian2dVectorArray(0, 0), + ) + grid = self._denormalize_grid( + grid=grid, + normalized_field=normalized_field, + normalized_pupil=False, + ) + + surfaces = [s for s in self.surfaces if s.aperture is not None] + if not surfaces: + raise ValueError( + "at least one surface in this system must have an aperture " + "to propagate a wavefield." + ) + + sensor = self.sensor + if sensor is None: + raise ValueError("this system must have a sensor to propagate a wavefield.") + + if isinstance(num, (int, np.integer, na.AbstractCartesian2dVectorArray)): + num = [num] * len(surfaces) + if len(num) != len(surfaces): + raise ValueError( + f"`num` must have one element for each of the " + f"{len(surfaces)} surfaces with an aperture, got {len(num)}." + ) + + if bound is None: + bound = [None] * len(surfaces) + if len(bound) != len(surfaces): + raise ValueError( + f"`bound` must have one element for each of the " + f"{len(surfaces)} surfaces with an aperture, got {len(bound)}." + ) + + axes = [ + (f"_{axis[0]}_{k % 2}", f"_{axis[1]}_{k % 2}") for k in range(len(surfaces)) + ] + seeds = [seed if seed is None else seed + k for k in range(len(surfaces))] + + surface_0 = surfaces[0] + + bound_0 = bound[0] + if (bound_0 is None) and surface_0.aperture.inverted: + # The aperture of an obscuration does not bound the beam, + # so sample the footprint of the pupil stop instead. + stop = self.pupil_stop + corner_lower = stop.aperture.bound_lower + corner_upper = stop.aperture.bound_upper + if stop.transformation is not None: + corner_lower = stop.transformation(corner_lower) + corner_upper = stop.transformation(corner_upper) + if surface_0.transformation is not None: + corner_lower = surface_0.transformation.inverse(corner_lower) + corner_upper = surface_0.transformation.inverse(corner_upper) + bound_0 = ( + np.minimum(corner_lower.xy, corner_upper.xy), + np.maximum(corner_lower.xy, corner_upper.xy), + ) + + wavefield = surface_0.wavefield_samples( + axis=axes[0], + num=num[0], + seed=seeds[0], + bound=bound_0, + ) + if surface_0.transformation is not None: + wavefield = surface_0.transformation(wavefield) + wavefield.wavelength = grid.wavelength + + obj = self.object + + if self.object_is_at_infinity: + rays = optika.rays.RayVectorArray( + position=na.Cartesian3dVectorArray() * u.mm, + direction=optika.direction(grid.field), + ) + if obj is not None: + if obj.transformation is not None: + rays = obj.transformation(rays) + if self.transformation is not None: + rays = self.transformation.inverse(rays) + + direction = rays.direction + + phase = (direction @ wavefield.position) / grid.wavelength + phase = phase.to(u.dimensionless_unscaled).value + wavefield.amplitude = wavefield.amplitude * np.exp(2j * np.pi * phase) + else: + rays = optika.rays.RayVectorArray( + position=na.Cartesian3dVectorArray( + x=grid.field.x, + y=grid.field.y, + z=0 * na.unit_normalized(grid.field.x), + ), + direction=na.Cartesian3dVectorArray(0, 0, 1), + ) + if obj is not None: + if obj.transformation is not None: + rays = obj.transformation(rays) + if self.transformation is not None: + rays = self.transformation.inverse(rays) + + displacement = wavefield.position - rays.position + distance = displacement.length + direction = displacement / distance + + phase = distance / grid.wavelength + phase = phase.to(u.dimensionless_unscaled).value + scale = distance.mean(axes[0]) / distance + wavefield.amplitude = ( + wavefield.amplitude * scale * np.exp(2j * np.pi * phase) + ) + + wavefield = surface_0._interact_wavefield( + wavefield=wavefield, + direction=direction, + ) + + for k, surface in enumerate(surfaces[1:], start=1): + wavefield = surface.propagate_wavefield( + wavefield=wavefield, + axis=axes[k - 1], + axis_new=axes[k], + num=num[k], + chunk_size=chunk_size, + seed=seeds[k], + bound=bound[k], + ) + + axis_last = axes[len(surfaces) - 1] + + if position_sensor is None: + if width_sensor is None: + raise ValueError( + "either `position_sensor` or `width_sensor` must be " "specified." + ) + + rayfunction = self.rayfunction( + wavelength=grid.wavelength, + field=grid.field, + normalized_field=False, + ) + position_spot = rayfunction.outputs.position.xy + where = rayfunction.outputs.unvignetted + axis_centroid = tuple( + set(na.shape(position_spot)) + - set(na.shape(grid.wavelength)) + - set(na.shape(grid.field)) + - set(self.shape) + ) + centroid = (position_spot * where).sum(axis_centroid) + centroid = centroid / where.sum(axis_centroid) + + position_sensor = na.Cartesian2dVectorLinearSpace( + start=centroid - width_sensor / 2, + stop=centroid + width_sensor / 2, + axis=na.Cartesian2dVectorArray(*axis), + num=num_sensor, + ).explicit + + sag_sensor = sensor.sag + if sag_sensor is None: + sag_sensor = optika.sags.NoSag() + + position = na.Cartesian3dVectorArray( + x=position_sensor.x, + y=position_sensor.y, + z=0 * na.unit_normalized(position_sensor.x), + ) + position.z = sag_sensor(position) + + position_global = position + if sensor.transformation is not None: + position_global = sensor.transformation(position) + + amplitude = optika.wavefields.rayleigh_sommerfeld( + wavefield=wavefield, + position=position_global, + axis=axis_last, + chunk_size=chunk_size, + ) + + if sensor.aperture is not None: + amplitude = amplitude * sensor.aperture(position) + + shape_sensor = na.shape(position_sensor) + if (axis[0] in shape_sensor) and (axis[1] in shape_sensor): + num_x = shape_sensor[axis[0]] + num_y = shape_sensor[axis[1]] + area = position_sensor.x.ptp(axis[0]) / (num_x - 1) + area = area * position_sensor.y.ptp(axis[1]) / (num_y - 1) + else: + area = 0 * u.mm**2 + + outputs = optika.wavefields.WavefieldVectorArray( + wavelength=grid.wavelength, + position=position, + amplitude=amplitude, + normal=sag_sensor.normal(position), + area=area, + index_refraction=wavefield.index_refraction, + ) + + return optika.wavefields.WavefieldFunctionArray( + inputs=optika.vectors.ObjectVectorArray( + wavelength=grid.wavelength, + field=grid.field, + pupil=position_sensor, + ), + outputs=outputs, + ) + + def psf( + self, + axis: tuple[str, str] = ("wavefield_x", "wavefield_y"), + **kwargs, + ) -> na.FunctionArray[na.SpectralPositionalVectorArray, na.AbstractScalar]: + r""" + The Huygens point-spread function of this system: the normalized + intensity of the wavefield computed by :meth:`wavefield` on a grid of + sensor positions. + + Parameters + ---------- + axis + The two logical axes of the grid of sensor positions. + kwargs + Additional keyword arguments passed to :meth:`wavefield`. + """ + wavefield = self.wavefield(axis=axis, **kwargs) + + intensity = np.square(np.abs(wavefield.outputs.amplitude)) + intensity = intensity / intensity.sum(axis) + + return na.FunctionArray( + inputs=na.SpectralPositionalVectorArray( + wavelength=wavefield.inputs.wavelength, + position=wavefield.outputs.position.xy, + ), + outputs=intensity, + ) + def _rayfunction_from_vertices( self, radiance: na.AbstractScalar, diff --git a/optika/systems/_wavefield_test.py b/optika/systems/_wavefield_test.py new file mode 100644 index 0000000..8479507 --- /dev/null +++ b/optika/systems/_wavefield_test.py @@ -0,0 +1,450 @@ +"""Tests of the physical-optics propagation methods of SequentialSystem.""" + +import dataclasses +import pytest +import numpy as np +import astropy.units as u +import named_arrays as na +import optika + + +def _telescope( + sag_primary: None | optika.sags.AbstractSag = None, + z_sensor: u.Quantity = 0 * u.mm, +) -> optika.systems.SequentialSystem: + """An on-axis parabolic telescope with focal length 200 mm and D=40 mm.""" + if sag_primary is None: + sag_primary = optika.sags.ParabolicSag(focal_length=-200 * u.mm) + + primary = optika.surfaces.Surface( + name="primary", + sag=sag_primary, + aperture=optika.apertures.CircularAperture(20 * u.mm), + material=optika.materials.Mirror(), + is_pupil_stop=True, + transformation=na.transformations.Cartesian3dTranslation(z=200 * u.mm), + ) + sensor = optika.sensors.ImagingSensor( + name="sensor", + width_pixel=20 * u.um, + axis_pixel=na.Cartesian2dVectorArray("detector_x", "detector_y"), + num_pixel=na.Cartesian2dVectorArray(128, 128), + timedelta_exposure=1 * u.s, + is_field_stop=True, + transformation=na.transformations.Cartesian3dTranslation(z=z_sensor), + ) + return optika.systems.SequentialSystem( + surfaces=[primary], + sensor=sensor, + grid_input=optika.vectors.ObjectVectorArray( + wavelength=500 * u.nm, + field=na.Cartesian2dVectorLinearSpace( + start=-1, + stop=1, + axis=na.Cartesian2dVectorArray("field_x", "field_y"), + num=5, + centers=True, + ), + pupil=na.Cartesian2dVectorLinearSpace( + start=-1, + stop=1, + axis=na.Cartesian2dVectorArray("pupil_x", "pupil_y"), + num=5, + centers=True, + ), + ), + ) + + +def _rms_radius( + psf: na.FunctionArray, + axis: tuple[str, str] = ("wavefield_x", "wavefield_y"), +) -> u.Quantity: + """The intensity-weighted RMS radius of a point-spread function.""" + intensity = psf.outputs + position = psf.inputs.position + centroid = (position * intensity).sum(axis) / intensity.sum(axis) + r2 = np.square((position - centroid).length) + return np.sqrt((r2 * intensity).sum(axis) / intensity.sum(axis)) + + +def test_psf_airy(): + """ + The on-axis Huygens PSF of an ideal parabolic telescope should be an + Airy pattern with its first zero at a radius of + :math:`1.22 \\lambda f / D`. + """ + system = _telescope() + + psf = system.psf( + field=na.Cartesian2dVectorArray(0, 0), + width_sensor=0.02 * u.mm, + num=101**2, + num_sensor=51, + seed=42, + ) + + intensity = psf.outputs + + assert np.allclose(intensity.sum(), 1) + + index_peak = np.unravel_index( + np.argmax(intensity.ndarray), + intensity.ndarray.shape, + ) + index_center = (51 // 2, 51 // 2) + assert index_peak == index_center + + cut = intensity[dict(wavefield_y=51 // 2)].ndarray + cut = cut / cut.max() + minima = [ + i for i in range(51 // 2 + 1, 50) if cut[i] < cut[i - 1] and cut[i] < cut[i + 1] + ] + assert minima + + x = psf.inputs.position.x + if "wavefield_y" in x.shape: + x = x[dict(wavefield_y=51 // 2)] + x = x.ndarray + radius_zero = np.abs(x[minima[0]] - x[51 // 2]) + + radius_airy = (1.22 * 500 * u.nm * 200 / 40).to(u.mm) + + # The grid resolution limits how precisely the zero can be located. + resolution = 0.02 * u.mm / 50 + assert np.abs(radius_zero - radius_airy) < resolution + + assert cut[minima[0]] < 1e-2 + + +def test_wavefield_centroid_matches_raytrace(): + """ + For an off-axis field angle, the centroid of the Huygens PSF should match + the centroid of a geometric raytrace to within a fraction of the Airy + radius. + """ + system = _telescope() + field = na.Cartesian2dVectorArray(0.5, 0) + + wavefield = system.wavefield( + field=field, + width_sensor=0.02 * u.mm, + num=101**2, + num_sensor=51, + seed=11, + ) + intensity = np.square(np.abs(wavefield.outputs.amplitude)) + position = wavefield.outputs.position.xy + centroid = (position * intensity).sum() / intensity.sum() + + rayfunction = system.rayfunction(field=field) + where = rayfunction.outputs.unvignetted + position_rays = rayfunction.outputs.position.xy + centroid_rays = (position_rays * where).sum() / where.sum() + + radius_airy = (1.22 * 500 * u.nm * 200 / 40).to(u.mm) + + assert np.abs(centroid.x - centroid_rays.x) < radius_airy / 2 + assert np.abs(centroid.y - centroid_rays.y) < radius_airy / 2 + + +def test_psf_zernike_defocus(): + """ + Adding a Zernike defocus term to the primary mirror shifts the focus by + an analytic amount: the PSF at the shifted sensor position should be much + sharper than at the nominal position. + """ + focal_length = 200 * u.mm + radius = 20 * u.mm + c = 0.5 * u.um + + sag = optika.sags.ZernikeSag( + base=optika.sags.ParabolicSag(focal_length=-focal_length), + coefficients=[0 * u.um, 0 * u.um, 0 * u.um, -c], + radius=radius, + ) + + focal_length_perturbed = 1 / ( + 1 / focal_length + 8 * np.sqrt(3) * c / np.square(radius) + ) + z_focus = focal_length - focal_length_perturbed + + kwargs = dict( + field=na.Cartesian2dVectorArray(0, 0), + width_sensor=0.2 * u.mm, + num=101**2, + num_sensor=51, + seed=3, + ) + + psf_nominal = _telescope(sag_primary=sag).psf(**kwargs) + psf_shifted = _telescope(sag_primary=sag, z_sensor=z_focus).psf(**kwargs) + + # Both PSFs are normalized to unit total energy over the same window, + # so the peak intensity measures the concentration of the light. + peak_nominal = psf_nominal.outputs.max() + peak_shifted = psf_shifted.outputs.max() + + assert peak_shifted > 5 * peak_nominal + + # Refocusing should also reduce the geometric spot size. + rms_nominal = _rms_radius(psf_nominal) + rms_shifted = _rms_radius(psf_shifted) + + assert rms_shifted < rms_nominal + + +def test_wavefield_multiwavelength(): + """The wavefield should broadcast over wavelength and field axes.""" + system = _telescope() + + wavelength = na.linspace(450, 550, axis="wavelength", num=2) * u.nm + field = na.Cartesian2dVectorArray( + x=na.linspace(-0.5, 0.5, axis="field", num=2), + y=0, + ) + + wavefield = system.wavefield( + wavelength=wavelength, + field=field, + width_sensor=0.05 * u.mm, + num=31**2, + num_sensor=11, + seed=5, + ) + + amplitude = wavefield.outputs.amplitude + assert "wavelength" in amplitude.shape + assert "field" in amplitude.shape + assert "wavefield_x" in amplitude.shape + assert "wavefield_y" in amplitude.shape + assert np.all(np.isfinite(np.abs(amplitude.ndarray))) + + +def test_psf_newtonian(): + """ + The Huygens PSF of the Newtonian telescope from the SequentialSystem + docstring, which exercises fold mirrors (the obliquity sign) and central + obscurations (beam-footprint sampling). + """ + primary_mirror_z = 200 * u.mm + fold_mirror_z = 50 * u.mm + sensor_x = 50 * u.mm + + primary_mirror = optika.surfaces.Surface( + name="mirror", + sag=optika.sags.ParabolicSag( + focal_length=-(primary_mirror_z - fold_mirror_z + sensor_x), + ), + aperture=optika.apertures.RectangularAperture(40 * u.mm), + material=optika.materials.Mirror(), + is_pupil_stop=True, + transformation=na.transformations.Cartesian3dTranslation( + z=primary_mirror_z, + ), + ) + fold_mirror = optika.surfaces.Surface( + name="fold_mirror", + aperture=optika.apertures.RectangularAperture(25 * u.mm), + material=optika.materials.Mirror(), + transformation=na.transformations.TransformationList( + [ + na.transformations.Cartesian3dRotationY((90 + 45) * u.deg), + na.transformations.Cartesian3dTranslation(z=fold_mirror_z), + ] + ), + ) + obscuration = optika.surfaces.Surface( + name="obscuration", + aperture=dataclasses.replace(fold_mirror.aperture, inverted=True), + transformation=fold_mirror.transformation, + ) + sensor = optika.sensors.ImagingSensor( + name="sensor", + width_pixel=20 * u.um, + axis_pixel=na.Cartesian2dVectorArray("detector_x", "detector_y"), + num_pixel=na.Cartesian2dVectorArray(128, 128), + timedelta_exposure=1 * u.s, + transformation=na.transformations.TransformationList( + [ + na.transformations.Cartesian3dRotationY(-90 * u.deg), + na.transformations.Cartesian3dTranslation( + x=-sensor_x, + z=fold_mirror_z, + ), + ] + ), + is_field_stop=True, + ) + system = optika.systems.SequentialSystem( + surfaces=[ + optika.surfaces.Surface(name="front"), + obscuration, + primary_mirror, + fold_mirror, + ], + sensor=sensor, + grid_input=optika.vectors.ObjectVectorArray( + wavelength=500 * u.nm, + field=na.Cartesian2dVectorLinearSpace( + start=-1, + stop=1, + axis=na.Cartesian2dVectorArray("field_x", "field_y"), + num=5, + centers=True, + ), + pupil=na.Cartesian2dVectorLinearSpace( + start=-1, + stop=1, + axis=na.Cartesian2dVectorArray("pupil_x", "pupil_y"), + num=5, + centers=True, + ), + ), + ) + + field = na.Cartesian2dVectorArray(0, 0) + + wavefield = system.wavefield( + field=field, + width_sensor=0.05 * u.mm, + num=101**2, + num_sensor=51, + seed=9, + ) + intensity = np.square(np.abs(wavefield.outputs.amplitude)) + position = wavefield.outputs.position.xy + centroid = (position * intensity).sum() / intensity.sum() + + rayfunction = system.rayfunction(field=field) + where = rayfunction.outputs.unvignetted + position_rays = rayfunction.outputs.position.xy + centroid_rays = (position_rays * where).sum() / where.sum() + + assert np.abs(centroid.x - centroid_rays.x) < 5 * u.um + assert np.abs(centroid.y - centroid_rays.y) < 5 * u.um + + +def test_wavefield_grating(): + """ + For a concave diffraction grating used in first order, the centroid of + the Huygens PSF should match the centroid of a geometric raytrace. + This exercises the ruling phase function of the wave-propagation path, + including its sign convention: an incorrect sign would displace the + Huygens spot by twice the (millimeter-scale) dispersion offset. + """ + grating = optika.surfaces.Surface( + name="grating", + sag=optika.sags.SphericalSag(radius=-400 * u.mm), + aperture=optika.apertures.CircularAperture(15 * u.mm), + material=optika.materials.Mirror(), + rulings=optika.rulings.Rulings( + spacing=20 * u.um, + diffraction_order=1, + ), + is_pupil_stop=True, + transformation=na.transformations.Cartesian3dTranslation(z=200 * u.mm), + ) + sensor = optika.sensors.ImagingSensor( + name="sensor", + width_pixel=50 * u.um, + axis_pixel=na.Cartesian2dVectorArray("detector_x", "detector_y"), + num_pixel=na.Cartesian2dVectorArray(256, 256), + timedelta_exposure=1 * u.s, + is_field_stop=True, + ) + system = optika.systems.SequentialSystem( + surfaces=[grating], + sensor=sensor, + grid_input=optika.vectors.ObjectVectorArray( + wavelength=500 * u.nm, + field=na.Cartesian2dVectorLinearSpace( + start=-1, + stop=1, + axis=na.Cartesian2dVectorArray("field_x", "field_y"), + num=5, + centers=True, + ), + pupil=na.Cartesian2dVectorLinearSpace( + start=-1, + stop=1, + axis=na.Cartesian2dVectorArray("pupil_x", "pupil_y"), + num=5, + centers=True, + ), + ), + ) + + field = na.Cartesian2dVectorArray(0, 0) + + wavefield = system.wavefield( + field=field, + width_sensor=0.2 * u.mm, + num=151**2, + num_sensor=51, + seed=4, + ) + intensity = np.square(np.abs(wavefield.outputs.amplitude)) + position = wavefield.outputs.position.xy + centroid = (position * intensity).sum() / intensity.sum() + + rayfunction = system.rayfunction(field=field) + where = rayfunction.outputs.unvignetted + position_rays = rayfunction.outputs.position.xy + centroid_rays = (position_rays * where).sum() / where.sum() + + assert np.abs(centroid.x - centroid_rays.x) < 2 * u.um + assert np.abs(centroid.y - centroid_rays.y) < 2 * u.um + + +def test_wavefield_invalid(): + system = _telescope() + + with pytest.raises(ValueError, match="width_sensor"): + system.wavefield(field=na.Cartesian2dVectorArray(0, 0)) + + with pytest.raises(ValueError, match="num"): + system.wavefield( + field=na.Cartesian2dVectorArray(0, 0), + width_sensor=0.02 * u.mm, + num=[100, 100], + ) + + +def test_wavefield_samples_decentered_aperture(): + """ + The stratified samples must cover an aperture that is decentered by its + internal transformation. + + Regression test: the polygonal-aperture bounding boxes used to ignore + the internal transformation, so the sampling grid covered the + untranslated footprint and only its overlap with the true aperture + received nonzero amplitude (an effective pupil far smaller than the + real one). + """ + half_width = 5 * u.mm + decenter = 12 * u.mm + surface = optika.surfaces.Surface( + name="decentered", + aperture=optika.apertures.RectangularAperture( + half_width=half_width, + transformation=na.transformations.Cartesian3dTranslation(x=decenter), + ), + ) + + samples = surface.wavefield_samples(axis=("sx", "sy"), num=51**2, seed=0) + + shape = na.shape_broadcasted( + samples.position.x, + samples.position.y, + samples.amplitude, + ) + where = np.abs(na.broadcast_to(samples.amplitude, shape).ndarray) > 0 + x = na.broadcast_to(samples.position.x, shape).ndarray[where] + y = na.broadcast_to(samples.position.y, shape).ndarray[where] + + assert np.any(where) + assert x.min() < decenter - 0.9 * half_width + assert x.max() > decenter + 0.9 * half_width + assert y.min() < -0.9 * half_width + assert y.max() > +0.9 * half_width diff --git a/optika/wavefields/__init__.py b/optika/wavefields/__init__.py new file mode 100644 index 0000000..843fce9 --- /dev/null +++ b/optika/wavefields/__init__.py @@ -0,0 +1,18 @@ +""" +Physical-optics propagation of scalar wavefields through an optical system. +""" + +from ._wavefield_vectors import AbstractWavefieldVectorArray, WavefieldVectorArray +from ._wavefield_functions import ( + AbstractWavefieldFunctionArray, + WavefieldFunctionArray, +) +from ._propagation import rayleigh_sommerfeld + +__all__ = [ + "AbstractWavefieldVectorArray", + "WavefieldVectorArray", + "AbstractWavefieldFunctionArray", + "WavefieldFunctionArray", + "rayleigh_sommerfeld", +] diff --git a/optika/wavefields/_propagation.py b/optika/wavefields/_propagation.py new file mode 100644 index 0000000..b67d0c6 --- /dev/null +++ b/optika/wavefields/_propagation.py @@ -0,0 +1,164 @@ +from __future__ import annotations +from typing import Sequence +import itertools +import numpy as np +import astropy.units as u +import named_arrays as na +import optika + +__all__ = [ + "rayleigh_sommerfeld", +] + + +def rayleigh_sommerfeld( + wavefield: optika.wavefields.AbstractWavefieldVectorArray, + position: na.AbstractCartesian3dVectorArray, + axis: str | Sequence[str], + chunk_size: int = 1024, +) -> na.AbstractScalar: + r""" + Evaluate the Rayleigh-Sommerfeld diffraction integral of the first kind + to find the complex amplitude of the given wavefield at the given + destination points. + + Parameters + ---------- + wavefield + The source wavefield, sampled at a discrete set of points on a + surface, expressed in the same coordinate system as `position`. + position + The destination points at which to evaluate the diffracted wavefield. + axis + The one or more logical axes of the source wavefield over which the + integral is evaluated. + chunk_size + The maximum number of destination points to consider simultaneously. + The peak memory usage of this function is approximately + :math:`16 \, N_\text{chunk} N_\text{source} N_\text{broadcast}` + bytes, where :math:`N_\text{broadcast}` is the number of elements + along the axes shared by `wavefield` and `position` + (wavelength, field angle, etc.). + + Notes + ----- + The Rayleigh-Sommerfeld diffraction integral of the first kind is + + .. math:: + + E(\mathbf{r}_2) = \frac{n}{i \lambda} \int_S E(\mathbf{r}_1) + \frac{e^{i k r_{12}}}{r_{12}} \cos \theta \, dA, + + where :math:`E(\mathbf{r}_1)` is the source wavefield, + :math:`r_{12} = |\mathbf{r}_2 - \mathbf{r}_1|` is the distance between + the source and destination points, + :math:`k = 2 \pi n / \lambda` is the wavenumber in the current medium, + :math:`n` is the index of refraction, + :math:`\lambda` is the vacuum wavelength, + and :math:`\theta` is the angle between :math:`\mathbf{r}_2 - \mathbf{r}_1` + and the surface normal at the source point. + + The integral is evaluated as a discrete sum over the source samples. + To bound memory usage, the destination points are split into chunks of at + most `chunk_size` points, and the outer product of source and destination + points is never materialized in full. + """ + if isinstance(axis, str): + axis = (axis,) + axis = tuple(axis) + + shape_src = wavefield.shape + for ax in axis: + if ax not in shape_src: + raise ValueError( + f"axis {ax!r} is not in the wavefield's shape, {shape_src}." + ) + + unit = u.mm + + position_src = wavefield.position.to(unit).value + xs = position_src.x + ys = position_src.y + zs = position_src.z + + normal = wavefield.normal + nx = normal.x + ny = normal.y + nz = normal.z + + wavelength = na.as_named_array(wavefield.wavelength).to(unit).value + index_refraction = na.as_named_array(wavefield.index_refraction) + + k = 2 * np.pi * index_refraction / wavelength + + amp = wavefield.amplitude * wavefield.area.to(unit**2).value + + # The sampling convention for surface normals allows them to point away + # from the destination surface (e.g. after a reflection), so compute the + # orientation of the obliquity factor once for the whole integral using + # the centroid of the destination points. + axis_dst = tuple(ax for ax in position.shape if ax not in shape_src) + centroid_dst = position.mean(axis_dst) if axis_dst else position + dot = (centroid_dst - wavefield.position) @ normal + axis_mean = tuple(ax for ax in axis if ax in na.shape(dot)) + if axis_mean: + dot = dot.mean(axis_mean) + sign = np.sign(na.as_named_array(dot).value) + + position_dst = position.to(unit).value + + shape_dst = {ax: position.shape[ax] for ax in axis_dst} + + shape_src_broadcast = {ax: num for ax, num in shape_src.items() if ax not in axis} + shape_result = na.broadcast_shapes(shape_src_broadcast, position.shape) + + result = na.ScalarArray( + ndarray=np.empty(tuple(shape_result.values()), dtype=complex), + axes=tuple(shape_result), + ) + + # Greedily choose a chunk length along each destination axis (starting + # from the last) so that the total number of destination points per chunk + # is at most `chunk_size`. + chunks = dict() + budget = max(chunk_size, 1) + for ax in reversed(tuple(shape_dst)): + chunks[ax] = min(shape_dst[ax], budget) + budget = max(budget // shape_dst[ax], 1) + + starts = itertools.product( + *[range(0, shape_dst[ax], chunks[ax]) for ax in shape_dst] + ) + + expression = ( + "amp" + " * ((xd - xs) * nx + (yd - ys) * ny + (zd - zs) * nz)" + " * exp(1j * k * sqrt((xd - xs)**2 + (yd - ys)**2 + (zd - zs)**2))" + " / ((xd - xs)**2 + (yd - ys)**2 + (zd - zs)**2)" + ) + + for start in starts: + index = {ax: slice(i, i + chunks[ax]) for ax, i in zip(shape_dst, start)} + + chunk = position_dst[index] + + term = na.numexpr.evaluate( + ex=expression, + local_dict=dict( + amp=amp, + xs=xs, + ys=ys, + zs=zs, + nx=nx, + ny=ny, + nz=nz, + k=k, + xd=chunk.x, + yd=chunk.y, + zd=chunk.z, + ), + ) + + result[index] = term.sum(axis) + + return result * sign * index_refraction / (1j * wavelength) diff --git a/optika/wavefields/_tests/__init__.py b/optika/wavefields/_tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/optika/wavefields/_tests/test_propagation.py b/optika/wavefields/_tests/test_propagation.py new file mode 100644 index 0000000..66547ef --- /dev/null +++ b/optika/wavefields/_tests/test_propagation.py @@ -0,0 +1,233 @@ +import pytest +import numpy as np +import astropy.units as u +import named_arrays as na +import optika + + +def _aperture_wavefield( + half_width: u.Quantity, + wavelength: u.Quantity, + num: int, + amplitude_function, + seed: int = 42, +) -> optika.wavefields.WavefieldVectorArray: + """A flat, square aperture at the origin with the given amplitude function.""" + grid = na.Cartesian2dVectorStratifiedRandomSpace( + start=-half_width, + stop=half_width, + axis=na.Cartesian2dVectorArray("sx", "sy"), + num=num, + seed=seed, + ).explicit + + return optika.wavefields.WavefieldVectorArray( + wavelength=wavelength, + position=na.Cartesian3dVectorArray(grid.x, grid.y, 0 * half_width), + amplitude=amplitude_function(grid), + normal=na.Cartesian3dVectorArray(0, 0, -1), + area=np.square(2 * half_width / num), + ) + + +def _focusing_phase( + grid: na.AbstractCartesian2dVectorArray, + wavelength: u.Quantity, + focal_length: u.Quantity, +) -> na.AbstractScalar: + """The complex phase of a perfect lens with the given focal length.""" + distance = np.sqrt(np.square(grid.length) + np.square(focal_length)) + phase = (distance / wavelength).to_value(u.dimensionless_unscaled) + return np.exp(-2j * np.pi * phase) + + +def test_airy_pattern(): + """ + A uniformly-illuminated circular aperture with a perfect focusing phase + should produce an Airy pattern with its first intensity zero at a radius + of :math:`1.22 \\lambda f / D`. + """ + wavelength = 500 * u.nm + radius = 5 * u.mm + focal_length = 1000 * u.mm + + def amplitude(grid: na.AbstractCartesian2dVectorArray) -> na.AbstractScalar: + phase = _focusing_phase(grid, wavelength, focal_length) + return np.where(grid.length <= radius, phase, 0) + + wavefield = _aperture_wavefield( + half_width=radius, + wavelength=wavelength, + num=101, + amplitude_function=amplitude, + ) + + radius_airy = (1.22 * wavelength * focal_length / (2 * radius)).to(u.mm) + + x = na.linspace(0, 2 * radius_airy, axis="dx", num=121) + position = na.Cartesian3dVectorArray(x, 0 * u.mm, focal_length) + + amplitude_detector = optika.wavefields.rayleigh_sommerfeld( + wavefield=wavefield, + position=position, + axis=("sx", "sy"), + ) + + intensity = np.square(np.abs(amplitude_detector)) + intensity = (intensity / intensity.max()).ndarray + + minima = [ + i + for i in range(1, len(intensity) - 1) + if intensity[i] < intensity[i - 1] and intensity[i] < intensity[i + 1] + ] + assert minima + + radius_zero = x.ndarray[minima[0]] + assert np.abs(radius_zero - radius_airy) < 0.02 * radius_airy + + # The first zero should be dark and the peak should be on axis. + assert intensity[minima[0]] < 1e-3 + assert np.argmax(intensity) == 0 + + +def test_gaussian_apodization(): + """ + A Gaussian-apodized aperture with waist :math:`w` and a perfect focusing + phase should produce a Gaussian point-spread function with standard + deviation :math:`\\sigma = \\lambda f / (2 \\pi w)`. + """ + wavelength = 500 * u.nm + waist = 1 * u.mm + focal_length = 1000 * u.mm + + def amplitude(grid: na.AbstractCartesian2dVectorArray) -> na.AbstractScalar: + apodization = np.exp( + -np.square(grid.length / waist).to_value(u.dimensionless_unscaled) + ) + return apodization * _focusing_phase(grid, wavelength, focal_length) + + wavefield = _aperture_wavefield( + half_width=3 * waist, + wavelength=wavelength, + num=101, + amplitude_function=amplitude, + ) + + sigma = (wavelength * focal_length / (2 * np.pi * waist)).to(u.mm) + + x = na.linspace(-4 * sigma, 4 * sigma, axis="dx", num=161) + position = na.Cartesian3dVectorArray(x, 0 * u.mm, focal_length) + + amplitude_detector = optika.wavefields.rayleigh_sommerfeld( + wavefield=wavefield, + position=position, + axis=("sx", "sy"), + ) + + intensity = np.square(np.abs(amplitude_detector)) + + sigma_measured = np.sqrt((intensity * np.square(x)).sum() / intensity.sum()) + + assert np.abs(sigma_measured - sigma) < 0.03 * sigma + + +def test_energy_conservation(): + """ + The total energy on a detector plane that captures essentially all of + the diffracted light should equal the total energy of the source. + """ + wavelength = 500 * u.nm + waist = 1 * u.mm + distance = 2000 * u.mm + + def amplitude(grid: na.AbstractCartesian2dVectorArray) -> na.AbstractScalar: + return ( + np.exp(-np.square(grid.length / waist).to_value(u.dimensionless_unscaled)) + + 0j + ) + + wavefield = _aperture_wavefield( + half_width=3 * waist, + wavelength=wavelength, + num=151, + amplitude_function=amplitude, + seed=7, + ) + + energy_source = (np.square(np.abs(wavefield.amplitude)) * wavefield.area).sum() + + half_width_detector = 6 * u.mm + num_detector = 151 + grid_detector = na.Cartesian2dVectorLinearSpace( + start=-half_width_detector, + stop=half_width_detector, + axis=na.Cartesian2dVectorArray("dx", "dy"), + num=num_detector, + ).explicit + area_detector = np.square(2 * half_width_detector / num_detector) + + amplitude_detector = optika.wavefields.rayleigh_sommerfeld( + wavefield=wavefield, + position=na.Cartesian3dVectorArray( + x=grid_detector.x, + y=grid_detector.y, + z=distance, + ), + axis=("sx", "sy"), + ) + + energy_detector = (np.square(np.abs(amplitude_detector)) * area_detector).sum() + + assert np.abs(energy_detector / energy_source - 1) < 0.01 + + +def test_chunk_invariance(): + """The result should not depend on the chunk size.""" + wavelength = 500 * u.nm + radius = 1 * u.mm + + def amplitude(grid: na.AbstractCartesian2dVectorArray) -> na.AbstractScalar: + return np.where(grid.length <= radius, 1 + 0j, 0) + + wavefield = _aperture_wavefield( + half_width=radius, + wavelength=wavelength, + num=21, + amplitude_function=amplitude, + ) + + position = na.Cartesian3dVectorLinearSpace( + start=na.Cartesian3dVectorArray(-1, -1, 99) * u.mm, + stop=na.Cartesian3dVectorArray(1, 1, 101) * u.mm, + axis=na.Cartesian3dVectorArray("dx", "dy", "dz"), + num=na.Cartesian3dVectorArray(7, 6, 5), + ).explicit + + results = [ + optika.wavefields.rayleigh_sommerfeld( + wavefield=wavefield, + position=position, + axis=("sx", "sy"), + chunk_size=chunk_size, + ) + for chunk_size in (1, 17, 10**6) + ] + + assert np.allclose(results[0], results[1], rtol=1e-9) + assert np.allclose(results[0], results[2], rtol=1e-9) + + +def test_invalid_axis(): + wavefield = optika.wavefields.WavefieldVectorArray( + wavelength=500 * u.nm, + position=na.Cartesian3dVectorArray(0, 0, 0) * u.mm, + normal=na.Cartesian3dVectorArray(0, 0, -1), + area=1 * u.mm**2, + ) + with pytest.raises(ValueError): + optika.wavefields.rayleigh_sommerfeld( + wavefield=wavefield, + position=na.Cartesian3dVectorArray(0, 0, 1) * u.mm, + axis="missing", + ) diff --git a/optika/wavefields/_tests/test_wavefield_vectors.py b/optika/wavefields/_tests/test_wavefield_vectors.py new file mode 100644 index 0000000..8f6a730 --- /dev/null +++ b/optika/wavefields/_tests/test_wavefield_vectors.py @@ -0,0 +1,86 @@ +import pytest +import numpy as np +import astropy.units as u +import named_arrays as na +import optika + +wavefields = [ + optika.wavefields.WavefieldVectorArray( + wavelength=500 * u.nm, + position=na.Cartesian3dVectorArray(0, 1, 2) * u.mm, + amplitude=1 + 0j, + normal=na.Cartesian3dVectorArray(0, 0, -1), + area=1 * u.mm**2, + ), + optika.wavefields.WavefieldVectorArray( + wavelength=500 * u.nm, + position=na.Cartesian3dVectorLinearSpace( + start=-10 * u.mm, + stop=10 * u.mm, + axis="s", + num=5, + ).explicit, + amplitude=np.exp(1j * na.linspace(0, np.pi, axis="s", num=5)), + normal=na.Cartesian3dVectorArray(0, 0, -1), + area=4 * u.mm**2, + ), +] + + +@pytest.mark.parametrize("array", wavefields) +class TestWavefieldVectorArray: + + def test_amplitude(self, array: optika.wavefields.AbstractWavefieldVectorArray): + amplitude = na.as_named_array(array.amplitude) + assert isinstance(amplitude, na.AbstractScalar) + assert na.unit_normalized(amplitude).is_equivalent(u.dimensionless_unscaled) + + def test_normal(self, array: optika.wavefields.AbstractWavefieldVectorArray): + normal = array.normal + assert isinstance(normal, na.AbstractCartesian3dVectorArray) + assert np.allclose(normal.length, 1) + + def test_area(self, array: optika.wavefields.AbstractWavefieldVectorArray): + area = na.as_named_array(array.area) + assert isinstance(area, na.AbstractScalar) + assert na.unit_normalized(area).is_equivalent(u.mm**2) + + def test_translation( + self, + array: optika.wavefields.AbstractWavefieldVectorArray, + ): + transformation = na.transformations.Cartesian3dTranslation( + x=1 * u.mm, + z=2 * u.mm, + ) + result = transformation(array) + + assert isinstance(result, optika.wavefields.AbstractWavefieldVectorArray) + assert np.all(result.position.x == array.position.x + 1 * u.mm) + assert np.all(result.position.z == array.position.z + 2 * u.mm) + + # Translations should not affect the surface normal. + assert np.all(result.normal == array.normal) + + assert result.amplitude is array.amplitude + assert result.wavelength is array.wavelength + assert result.area is array.area + + def test_rotation( + self, + array: optika.wavefields.AbstractWavefieldVectorArray, + ): + transformation = na.transformations.Cartesian3dRotationY(90 * u.deg) + result = transformation(array) + + assert isinstance(result, optika.wavefields.AbstractWavefieldVectorArray) + + matrix = na.Cartesian3dYRotationMatrixArray(90 * u.deg) + + # Rotations should rotate both the position and the normal. + assert np.allclose(result.position, matrix @ array.position) + assert np.allclose(result.normal, matrix @ array.normal) + + assert result.amplitude is array.amplitude + assert result.wavelength is array.wavelength + assert result.area is array.area diff --git a/optika/wavefields/_wavefield_functions.py b/optika/wavefields/_wavefield_functions.py new file mode 100644 index 0000000..df4c3b6 --- /dev/null +++ b/optika/wavefields/_wavefield_functions.py @@ -0,0 +1,38 @@ +from __future__ import annotations +import dataclasses +import named_arrays as na +import optika +from ._wavefield_vectors import WavefieldVectorArray + +__all__ = [ + "AbstractWavefieldFunctionArray", + "WavefieldFunctionArray", +] + + +@dataclasses.dataclass(eq=False, repr=False) +class AbstractWavefieldFunctionArray( + na.AbstractFunctionArray, +): + """ + An interface describing a function which maps input wavelength and field + coordinates to an output wavefield. + """ + + @property + def type_explicit(self) -> type[WavefieldFunctionArray]: + return WavefieldFunctionArray + + +@dataclasses.dataclass(eq=False, repr=False) +class WavefieldFunctionArray( + AbstractWavefieldFunctionArray, + na.FunctionArray[ + optika.vectors.ObjectVectorArray, + WavefieldVectorArray, + ], +): + """ + A discrete function which maps input wavelength and field coordinates to + an output wavefield. + """ diff --git a/optika/wavefields/_wavefield_vectors.py b/optika/wavefields/_wavefield_vectors.py new file mode 100644 index 0000000..d21f8a8 --- /dev/null +++ b/optika/wavefields/_wavefield_vectors.py @@ -0,0 +1,229 @@ +from __future__ import annotations +from typing import TypeVar, Generic +import abc +import dataclasses +import numpy as np +import astropy.units as u +import named_arrays as na + +__all__ = [ + "AbstractWavefieldVectorArray", + "WavefieldVectorArray", +] + +AmplitudeT = TypeVar("AmplitudeT", bound=na.ScalarLike) +PositionT = TypeVar("PositionT", bound=na.Cartesian3dVectorArray) +NormalT = TypeVar("NormalT", bound=na.Cartesian3dVectorArray) +AreaT = TypeVar("AreaT", bound=na.ScalarLike) +WavelengthT = TypeVar("WavelengthT", bound=na.ScalarLike) +IndexRefractionT = TypeVar("IndexRefractionT", bound=na.ScalarLike) +UnvignettedT = TypeVar("UnvignettedT", bound=na.ScalarLike) + + +@dataclasses.dataclass(eq=False, repr=False) +class AbstractWavefieldVectorArray( + na.AbstractSpectralPositionalVectorArray, +): + """ + An interface describing a complex scalar wavefield sampled at a discrete + set of points on a surface. + """ + + @property + @abc.abstractmethod + def amplitude(self) -> na.AbstractScalar: + """ + The dimensionless, complex scalar amplitude of the wavefield at each + sample point. + """ + + @property + @abc.abstractmethod + def normal(self) -> na.AbstractCartesian3dVectorArray: + """The unit vector perpendicular to the surface at each sample point.""" + + @property + @abc.abstractmethod + def area(self) -> na.AbstractScalar: + """The area of the surface element associated with each sample point.""" + + @property + @abc.abstractmethod + def index_refraction(self) -> na.AbstractScalar: + """ + The index of refraction of the medium the wavefield is propagating + into. + + Note that :attr:`wavelength` is always the vacuum wavelength, + the medium enters the propagation only through this index. + """ + + @property + @abc.abstractmethod + def unvignetted(self) -> na.AbstractScalar: + """ + A boolean mask where :obj:`False` indicates the sample point was + blocked by an aperture (the amplitude is also zero there). + """ + + @property + def type_abstract(self) -> type[na.AbstractArray]: + return AbstractWavefieldVectorArray + + @property + def type_explicit(self) -> type[na.AbstractExplicitArray]: + return WavefieldVectorArray + + @property + def type_matrix(self) -> type[na.AbstractMatrixArray]: + raise NotImplementedError + + @property + def explicit(self) -> WavefieldVectorArray: + return super().explicit + + def __array_matmul__( + self, + x1: na.ArrayLike, + x2: na.ArrayLike, + out: tuple[None | na.AbstractExplicitArray] = (None,), + **kwargs, + ) -> na.AbstractExplicitArray | WavefieldVectorArray: + result = super().__array_matmul__( + x1=x1, + x2=x2, + out=out, + **kwargs, + ) + if result is not NotImplemented: + return result + + if isinstance(x1, AbstractWavefieldVectorArray): + if isinstance(x2, na.AbstractCartesian3dMatrixArray): + return dataclasses.replace( + x1, + position=x1.position @ x2, + normal=x1.normal @ x2, + ) + else: + return NotImplemented + elif isinstance(x2, AbstractWavefieldVectorArray): + if isinstance(x1, na.AbstractCartesian3dMatrixArray): + return dataclasses.replace( + x2, + position=x1 @ x2.position, + normal=x1 @ x2.normal, + ) + else: + return NotImplemented + else: + return NotImplemented + + def __array_add__( + self, + x1: na.ArrayLike, + x2: na.ArrayLike, + out: tuple[None | na.AbstractExplicitArray] = (None,), + **kwargs, + ) -> na.AbstractExplicitArray: + if isinstance(x1, AbstractWavefieldVectorArray): + if isinstance(x2, na.AbstractCartesian3dVectorArray): + return dataclasses.replace( + x1, + position=x1.position + x2, + ) + else: + return NotImplemented + elif isinstance(x2, AbstractWavefieldVectorArray): + if isinstance(x1, na.AbstractCartesian3dVectorArray): + return dataclasses.replace( + x2, + position=x1 + x2.position, + ) + else: + return NotImplemented + else: + return NotImplemented + + def __array_ufunc__( + self, + function, + method: str, + *inputs, + **kwargs, + ) -> None | WavefieldVectorArray | tuple[WavefieldVectorArray, ...]: + result = super().__array_ufunc__( + function, + method, + *inputs, + **kwargs, + ) + if result is not NotImplemented: + return result + + if function is np.add: + if method == "__call__": + return self.__array_add__(*inputs, **kwargs) + + +@dataclasses.dataclass(eq=False, repr=False) +class WavefieldVectorArray( + AbstractWavefieldVectorArray, + na.SpectralPositionalVectorArray[WavelengthT, PositionT], + Generic[ + AmplitudeT, + PositionT, + NormalT, + AreaT, + WavelengthT, + IndexRefractionT, + UnvignettedT, + ], +): + """ + A complex scalar wavefield sampled at a discrete set of points on a + surface. + + Note that the amplitude is dimensionless: the Rayleigh-Sommerfeld kernel + factor :math:`\\cos \\theta \\, dA / (\\lambda r)` is dimensionless, so the + amplitude stays dimensionless as the wavefield propagates through an + optical system, and intensities are typically normalized at the end of + the calculation (e.g. for a point-spread function). + """ + + amplitude: AmplitudeT = 1 + 0j + """ + The dimensionless, complex scalar amplitude of the wavefield at each + sample point. + """ + + normal: NormalT = 0 + """The unit vector perpendicular to the surface at each sample point.""" + + area: AreaT = 0 * u.mm**2 + """The area of the surface element associated with each sample point.""" + + index_refraction: IndexRefractionT = 1 + """The index of refraction of the medium the wavefield is propagating into.""" + + unvignetted: UnvignettedT = True + """ + A boolean mask where :obj:`False` indicates the sample point was blocked + by an aperture. + """ + + @classmethod + def from_scalar( + cls, + scalar: na.AbstractScalar, + like: None | na.AbstractExplicitVectorArray = None, + ) -> WavefieldVectorArray: + return cls( + wavelength=scalar, + position=scalar, + amplitude=scalar, + normal=scalar, + area=scalar, + index_refraction=scalar, + unvignetted=scalar, + )