diff --git a/psiflow/__init__.py b/psiflow/__init__.py index 73f2e3f..8f0d341 100644 --- a/psiflow/__init__.py +++ b/psiflow/__init__.py @@ -5,8 +5,8 @@ from .serialization import ( # noqa: F401 _DataFuture, deserialize, - serializable, serialize, + register_serializable, ) @@ -30,4 +30,3 @@ def resolve_and_check(path: Path) -> Path: load = ExecutionContextLoader.load context = ExecutionContextLoader.context wait = ExecutionContextLoader.wait - diff --git a/psiflow/data/dataset.py b/psiflow/data/dataset.py index 660928c..21bb476 100644 --- a/psiflow/data/dataset.py +++ b/psiflow/data/dataset.py @@ -36,7 +36,7 @@ ) -@psiflow.serializable +@psiflow.register_serializable class Dataset: """ A class representing a dataset of atomic structures. diff --git a/psiflow/execution.py b/psiflow/execution.py index 4d4aaef..5d46a59 100644 --- a/psiflow/execution.py +++ b/psiflow/execution.py @@ -287,6 +287,7 @@ def __init__( # TODO: how to handle env variables? # disable thread affinity and busy-idling until we can isolate task resources default_env_vars = { + "PYTHONUNBUFFERED": "TRUE", "OMP_PROC_BIND": "FALSE", "OMP_WAIT_POLICY": "PASSIVE", "OMP_DISPLAY_ENV": "VERBOSE", # verbose OMP log diff --git a/psiflow/functions.py b/psiflow/functions.py index 8689616..267c3b9 100644 --- a/psiflow/functions.py +++ b/psiflow/functions.py @@ -297,7 +297,7 @@ def __post_init__(self): # OMP_NUM_THREADS for parallel evaluation does not work.. # https://github.com/dftd3/simple-dftd3/issues/49 # TODO: check whether this is still the case - os.environ["OMP_NUM_THREADS"] = str(self.num_threads * 10) + os.environ["OMP_NUM_THREADS"] = str(self.num_threads) from dftd3.ase import DFTD3 diff --git a/psiflow/hamiltonians.py b/psiflow/hamiltonians.py index 59ca43e..c8faee9 100644 --- a/psiflow/hamiltonians.py +++ b/psiflow/hamiltonians.py @@ -1,7 +1,7 @@ import urllib from functools import partial from pathlib import Path -from typing import ClassVar, Optional, Union, Callable, Sequence +from typing import Optional, Union, Callable, Sequence import numpy as np from parsl.app.app import python_app @@ -26,13 +26,16 @@ from psiflow.utils.io import dump_json -@psiflow.serializable +apply_threads = python_app(_apply, executors=["default_threads"]) +apply_htex = python_app(_apply, executors=["default_htex"]) +apply_modelevaluation = python_app(_apply, executors=["ModelEvaluation"]) + + class Hamiltonian(Computable): - # TODO: app is actually an instance variable, but serialization complains.. - outputs: ClassVar[tuple] = ("energy", "forces", "stress") + outputs: tuple = ("energy", "forces", "stress") batch_size = 1000 - app: ClassVar[Callable] - function_name: ClassVar[str] + app: Callable + function_name: str def compute( self, @@ -44,12 +47,7 @@ def compute( outputs = tuple(self.__class__.outputs) if batch_size == -1: batch_size = self.__class__.batch_size - return compute( - arg, - self.app, - outputs_=outputs, - batch_size=batch_size, - ) + return compute(arg, self.get_app(), outputs_=outputs, batch_size=batch_size) def __eq__(self, hamiltonian: "Hamiltonian") -> bool: raise NotImplementedError @@ -80,32 +78,13 @@ def serialize_function(self, **kwargs) -> DataFuture: ).outputs[0] def parameters(self) -> dict: - return {} - - -@psiflow.serializable -class Zero(Hamiltonian): - - def __init__(self): - apply_zero = python_app(_apply, executors=["default_threads"]) - self.app = partial(apply_zero, function_cls=ZeroFunction) - - def __eq__(self, hamiltonian: Hamiltonian) -> bool: - if type(hamiltonian) is Zero: - return True - return False - - def __mul__(self, a: float) -> "Zero": - return Zero() - - def __add__(self, hamiltonian: Hamiltonian) -> Hamiltonian: - # (Zero + Hamiltonian) is different from (Hamiltonian + Zero) - return hamiltonian + raise NotImplementedError - __rmul__ = __mul__ # handle float * Zero + def get_app(self) -> Callable: + raise NotImplementedError -@psiflow.serializable +@psiflow.register_serializable class MixtureHamiltonian(Hamiltonian): hamiltonians: list[Hamiltonian] coefficients: list[float] @@ -127,10 +106,9 @@ def compute( # override compute for efficient batching ) -> Union[list[AppFuture], AppFuture]: if outputs is None: outputs = list(self.__class__.outputs) - apply_apps = [h.app for h in self.hamiltonians] + apply_apps = [h.get_app() for h in self.hamiltonians] reduce_func = partial( - aggregate_multiple, - coefficients=np.array(self.coefficients), + aggregate_multiple, coefficients=np.array(self.coefficients) ) return compute( arg, @@ -231,25 +209,47 @@ def serialize(self, **kwargs) -> list[DataFuture]: return [h.serialize_function(**kwargs) for h in self.hamiltonians] -@psiflow.serializable +@psiflow.register_serializable +class Zero(Hamiltonian): + function_name: str = "ZeroFunction" + + def __init__(self): + pass + + def get_app(self) -> Callable: + return partial(apply_threads, function_cls=ZeroFunction) + + def __eq__(self, hamiltonian: Hamiltonian) -> bool: + if type(hamiltonian) is Zero: + return True + return False + + def __mul__(self, a: float) -> "Zero": + return Zero() + + def __add__(self, hamiltonian: Hamiltonian) -> Hamiltonian: + # (Zero + Hamiltonian) is different from (Hamiltonian + Zero) + return hamiltonian + + __rmul__ = __mul__ # handle float * Zero + + +@psiflow.register_serializable class EinsteinCrystal(Hamiltonian): - reference_geometry: Union[Geometry, AppFuture] + # TODO: logic not consistent depending on Geometry | AppFuture + reference_geometry: Geometry | AppFuture force_constant: float - function_name: ClassVar[str] = "EinsteinCrystalFunction" + function_name: str = "EinsteinCrystalFunction" - def __init__( - self, geometry: Union[Geometry, AppFuture[Geometry]], force_constant: float - ): + def __init__(self, geometry: Union[Geometry, AppFuture], force_constant: float): super().__init__() self.reference_geometry = copy_app_future(geometry) self.force_constant = force_constant self.external = None # needed - self._create_apps() - def _create_apps(self): - apply_app = python_app(_apply, executors=["default_threads"]) - self.app = partial( - apply_app, function_cls=EinsteinCrystalFunction, **self.parameters() + def get_app(self) -> Callable: + return partial( + apply_threads, function_cls=EinsteinCrystalFunction, **self.parameters() ) def parameters(self) -> dict: @@ -269,11 +269,11 @@ def __eq__(self, hamiltonian: Hamiltonian) -> bool: return True -@psiflow.serializable +@psiflow.register_serializable class PlumedHamiltonian(Hamiltonian): plumed_input: str # TODO: or future? external: Optional[psiflow._DataFuture] - function_name: ClassVar[str] = "PlumedFunction" + function_name: str = "PlumedFunction" def __init__( self, @@ -281,22 +281,15 @@ def __init__( external: Union[None, str, Path, File, DataFuture] = None, ): super().__init__() - self.plumed_input = remove_comments_printflush(plumed_input) if type(external) in [str, Path]: external = File(str(external)) if external is not None: assert external.filepath in self.plumed_input self.external = external - self._create_apps() - def _create_apps(self): - apply_app = python_app(_apply, executors=["default_htex"]) - self.app = partial( - apply_app, - function_cls=PlumedFunction, - **self.parameters(), - ) + def get_app(self) -> Callable: + return partial(apply_htex, function_cls=PlumedFunction, **self.parameters()) def parameters(self) -> dict: if self.external is not None: # ensure parameters depends on self.external @@ -314,28 +307,24 @@ def __eq__(self, other: Hamiltonian) -> bool: return True -@psiflow.serializable +@psiflow.register_serializable class Harmonic(Hamiltonian): - reference_geometry: Union[Geometry, AppFuture[Geometry]] - hessian: Union[np.ndarray, AppFuture[np.ndarray]] - function_name: ClassVar[str] = "HarmonicFunction" + reference_geometry: Geometry | AppFuture + hessian: np.ndarray | AppFuture + function_name: str = "HarmonicFunction" def __init__( self, - reference_geometry: Union[Geometry, AppFuture[Geometry]], - hessian: Union[np.ndarray, AppFuture[np.ndarray]], + reference_geometry: Geometry | AppFuture, + hessian: np.ndarray | AppFuture, ): # TODO: why not copy_app_future(geometry) like others? self.reference_geometry = reference_geometry self.hessian = hessian - self._create_apps() - def _create_apps(self): - apply_app = python_app(_apply, executors=["default_threads"]) - self.app = partial( - apply_app, - function_cls=HarmonicFunction, - **self.parameters(), + def get_app(self) -> Callable: + return partial( + apply_threads, function_cls=HarmonicFunction, **self.parameters() ) def parameters(self) -> dict: @@ -367,30 +356,26 @@ def __eq__(self, hamiltonian: Hamiltonian) -> bool: return True -@psiflow.serializable +@psiflow.register_serializable class D3Hamiltonian(Hamiltonian): method: str damping: str - function_name: ClassVar[str] = "DispersionFunction" + function_name: str = "DispersionFunction" def __init__(self, method: str, damping: str = "d3bj"): self.method, self.damping = method, damping - self._create_apps() - def _create_apps(self): - # TODO: does this make sense? GPU settings are useless for example + def get_app(self) -> Callable: + # execution-side parameters of function are not included in self.parameters() evaluation = psiflow.context().definitions["ModelEvaluation"] - apply_app = python_app(_apply, executors=["ModelEvaluation"]) resources = evaluation.wq_resources(1) - resources.pop("gpus", None) - - # execution-side parameters of function are not included in self.parameters() - self.app = partial( - apply_app, + resources.pop("gpus", None) # do not request GPU + return partial( + apply_modelevaluation, function_cls=DispersionFunction, parsl_resource_specification=resources, **self.parameters(), - num_threads=resources.get("cores", 1), # TODO: sloppy + num_threads=resources.get("cores"), ) def parameters(self) -> dict: @@ -406,11 +391,11 @@ def __eq__(self, hamiltonian: Hamiltonian) -> bool: return True -@psiflow.serializable +@psiflow.register_serializable class MACEHamiltonian(Hamiltonian): external: psiflow._DataFuture atomic_energies: dict[str, float] - function_name: ClassVar[str] = "MACEFunction" + function_name: str = "MACEFunction" def __init__( self, @@ -422,16 +407,13 @@ def __init__( self.external = File(external) else: self.external = external - self._create_apps() - def _create_apps(self): + def get_app(self) -> Callable: + # execution-side parameters of function are not included in self.parameters() evaluation = psiflow.context().definitions["ModelEvaluation"] - apply_app = python_app(_apply, executors=["ModelEvaluation"]) resources = evaluation.wq_resources(1) - - # execution-side parameters of function are not included in self.parameters() - self.app = partial( - apply_app, + return partial( + apply_modelevaluation, function_cls=MACEFunction, parsl_resource_specification=resources, **self.parameters(), @@ -469,7 +451,7 @@ def __eq__(self, hamiltonian: Hamiltonian) -> bool: # TODO: the methods below are outdated.. @classmethod - def mace_mp0(cls, size: str = "small") -> 'MACEHamiltonian': + def mace_mp0(cls, size: str = "small") -> "MACEHamiltonian": urls = dict( small="https://github.com/ACEsuit/mace-mp/releases/download/mace_mp_0/2023-12-10-mace-128-L0_energy_epoch-249.model", # 2023-12-10-mace-128-L0_energy_epoch-249.model large="https://github.com/ACEsuit/mace-mp/releases/download/mace_mp_0/2023-12-03-mace-128-L1_epoch-199.model", @@ -483,7 +465,7 @@ def mace_mp0(cls, size: str = "small") -> 'MACEHamiltonian': return cls(parsl_file, {}) @classmethod - def mace_cc(cls) -> 'MACEHamiltonian': + def mace_cc(cls) -> "MACEHamiltonian": url = "https://github.com/molmod/psiflow/raw/main/examples/data/ani500k_cc_cpu.model" parsl_file = psiflow.context().new_file("mace_mp_", ".pth") urllib.request.urlretrieve( diff --git a/psiflow/learning.py b/psiflow/learning.py index eab93d1..4ba8ec6 100644 --- a/psiflow/learning.py +++ b/psiflow/learning.py @@ -100,7 +100,7 @@ def evaluate_outputs( @typeguard.typechecked -@psiflow.serializable +# @psiflow.serializable class Learning: reference: Reference path_output: str diff --git a/psiflow/metrics.py b/psiflow/metrics.py index 0234642..7e78976 100644 --- a/psiflow/metrics.py +++ b/psiflow/metrics.py @@ -454,7 +454,7 @@ def _initialize_wandb( @typeguard.typechecked -@psiflow.serializable +# @psiflow.serializable class Metrics: wandb_group: Optional[str] wandb_project: Optional[str] diff --git a/psiflow/models/_mace.py b/psiflow/models/_mace.py index 46fc04d..67d241c 100644 --- a/psiflow/models/_mace.py +++ b/psiflow/models/_mace.py @@ -3,7 +3,7 @@ import logging from dataclasses import asdict, dataclass from functools import partial -from typing import Optional, Union +from typing import Optional, Union, Callable import typeguard from parsl.app.app import bash_app @@ -14,7 +14,6 @@ from psiflow.hamiltonians import MACEHamiltonian from psiflow.models.model import Model -# TODO: not used logger = logging.getLogger(__name__) # logging per module @@ -199,11 +198,11 @@ def train( @typeguard.typechecked -@psiflow.serializable +@psiflow.register_serializable class MACE(Model): _config: dict model_future: Optional[psiflow._DataFuture] - atomic_energies: dict[str, Union[float, AppFuture]] + atomic_energies: dict[str, float | AppFuture] def __init__(self, **config) -> None: config = MACEConfig(**config) # validate input @@ -216,32 +215,29 @@ def __init__(self, **config) -> None: self.model_future = None self.atomic_energies = {} - self._create_apps() + def get_train_app(self) -> Callable: + training = psiflow.context().definitions["ModelTraining"] + return partial( + bash_app(train, executors=[training.name]), + command_train=training.train_command(False), + env_vars=training.env_vars, + parsl_resource_specification=training.wq_resources(), + ) - def _create_apps(self): # initialize apps + def get_initialise_app(self) -> Callable: evaluation = psiflow.context().definitions["ModelEvaluation"] training = psiflow.context().definitions["ModelTraining"] command_init = training.train_command(True) - command_train = training.train_command(False) - app_initialize = bash_app(initialize, executors=[evaluation.name]) - resources_init = evaluation.wq_resources(1) # TODO: find a better way for model init + resources_init = evaluation.wq_resources(1) if not evaluation.executor_type == "threadpool": resources_init["running_time_min"] = 30 # at least 30 mins for init? - app_train = bash_app(train, executors=[training.name]) - resources_train = training.wq_resources() - self._initialize = partial( + return partial( app_initialize, command_train=command_init, parsl_resource_specification=resources_init, ) - self._train = partial( - app_train, - command_train=command_train, - env_vars=training.env_vars, - parsl_resource_specification=resources_train, - ) @property def seed(self) -> int: diff --git a/psiflow/models/model.py b/psiflow/models/model.py index 73c9cb1..84d3938 100644 --- a/psiflow/models/model.py +++ b/psiflow/models/model.py @@ -1,9 +1,10 @@ from __future__ import annotations # necessary for type-guarding class methods +import copy import logging from dataclasses import asdict from pathlib import Path -from typing import Optional, Union +from typing import Optional, Union, Callable import parsl import typeguard @@ -20,7 +21,7 @@ @typeguard.typechecked -@psiflow.serializable +@psiflow.register_serializable # TODO: not required? you should always subclass this class Model: _config: dict model_future: Optional[psiflow._DataFuture] @@ -57,11 +58,9 @@ def train(self, training: Dataset, validation: Dataset) -> None: validation.subtract_offset(**self.atomic_energies).extxyz, ] else: - inputs += [ - training.extxyz, - validation.extxyz, - ] - future = self._train( + inputs += [training.extxyz, validation.extxyz] + train_app = self.get_train_app() + future = train_app( dict(self._config), stdout=parsl.AUTO_LOGNAME, stderr=parsl.AUTO_LOGNAME, @@ -77,7 +76,8 @@ def initialize(self, dataset: Dataset) -> None: inputs = [dataset.subtract_offset(**self.atomic_energies).extxyz] else: inputs = [dataset.extxyz] - future = self._initialize( + initialise_app = self.get_initialise_app() + future = initialise_app( self._config, stdout=parsl.AUTO_LOGNAME, stderr=parsl.AUTO_LOGNAME, @@ -126,6 +126,12 @@ def copy(self) -> Model: ).outputs[0] return model + def get_train_app(self) -> Callable: + raise NotImplementedError + + def get_initialise_app(self) -> Callable: + raise NotImplementedError + @property def do_offset(self) -> bool: return len(self.atomic_energies) > 0 diff --git a/psiflow/reference/cp2k_.py b/psiflow/reference/cp2k_.py index 4d707f8..014a1e4 100644 --- a/psiflow/reference/cp2k_.py +++ b/psiflow/reference/cp2k_.py @@ -1,7 +1,8 @@ import copy import io import warnings -from typing import Optional, Union, ClassVar +from typing import Optional +from collections.abc import Sequence import numpy as np from ase.data import chemical_symbols @@ -14,7 +15,7 @@ import psiflow from psiflow.geometry import Geometry from psiflow.reference.reference import Reference, Status, get_spin_multiplicities -from psiflow.utils.parse import find_line, lines_to_array +from psiflow.utils.parse import find_line, lines_to_array, format_env_vars # costly to initialise input_parser = CP2KInputParserSimplified( @@ -40,8 +41,7 @@ def modify_input(input_dict: dict, properties: tuple) -> None: if global_dict.get("print_level") in ["SILENT", "LOW"]: global_dict["print_level"] = "MEDIUM" - if "forces" in properties: - # output forces + if "forces" in properties: # output forces global_dict["run_type"] = "ENERGY_FORCE" input_dict["force_eval"]["print"] = {"FORCES": {}} elif "energy" in properties: @@ -55,9 +55,17 @@ def modify_input(input_dict: dict, properties: tuple) -> None: global_dict["fm"] = {"type_of_matrix_multiplication": "SCALAPACK"} -def parse_output(output_str: str, properties: tuple) -> dict[str, float | np.ndarray]: +def make_bash_template(executor: str) -> str: + context = psiflow.context() + definition = context.definitions[executor] + command = f"cp {{}} cp2k.inp\n{definition.command()}" + env = format_env_vars(definition.env_vars) + return context.bash_template.format(commands=command, env=env) + + +def parse_output(stdout: str, outputs: Sequence[str]) -> dict: """Very basic output parser. Perhaps check the cp2k-output-tools package?""" - lines = output_str.split("\n") + lines = stdout.split("\n") data = {} # output status @@ -82,7 +90,7 @@ def parse_output(output_str: str, properties: tuple) -> dict[str, float | np.nda idx = find_line(lines, key, idx) data["energy"] = float(lines[idx].split()[-1]) * Ha - if "forces" not in properties: + if "forces" not in outputs: return data # read forces @@ -93,9 +101,9 @@ def parse_output(output_str: str, properties: tuple) -> dict[str, float | np.nda return data | {"forces": forces * Ha / Bohr} -@psiflow.serializable +@psiflow.register_serializable class CP2K(Reference): - executor: ClassVar[str] = "CP2K" + executor: str = "CP2K" _execute_label = "cp2k_singlepoint" input_dict: dict @@ -103,9 +111,9 @@ def __init__(self, input_str: str, **kwargs): super().__init__(**kwargs) self.input_dict = str_to_dict(input_str) modify_input(self.input_dict, self.outputs) - self._create_apps() + self.bash_template = make_bash_template(self.executor) - def compute_atomic_energy(self, element, box_size=None) -> AppFuture[float]: + def compute_atomic_energy(self, element, box_size=None) -> AppFuture: assert box_size, "CP2K expects a periodic box." return super().compute_atomic_energy(element, box_size) @@ -121,8 +129,10 @@ def get_single_atom_references(self, element: str) -> dict[int, Reference]: input_section["scf"]["ot"] = {"minimizer": "CG"} else: input_section["scf"] = {"ot": {"minimizer": "CG"}} + # necessary for oxygen calculation, at least in 2024.1 - input_section["scf"]["ignore_convergence_failure"] = "TRUE" + # TODO: not an allowed keyword in 2025.2 + # input_section["scf"]["ignore_convergence_failure"] = "TRUE" input_str = dict_to_str(input_dict) references = {} @@ -130,17 +140,9 @@ def get_single_atom_references(self, element: str) -> dict[int, Reference]: references[mult] = CP2K( input_str.format(mult=mult), outputs=self.outputs, - executor=self.executor, ) return references - def get_shell_command(self, inputs: list[File]) -> str: - command_list = [f"cp {inputs[0].filepath} cp2k.inp", self.execute_command] - return "\n".join(command_list) - - def parse_output(self, stdout: str) -> dict: - return parse_output(stdout, self.outputs) - def create_input(self, geom: Geometry) -> tuple[bool, Optional[File]]: if not geom.periodic: msg = "CP2K expects periodic boundary conditions, skipping geometry" @@ -168,3 +170,6 @@ def create_input(self, geom: Geometry) -> tuple[bool, Optional[File]]: self.input_dict["force_eval"]["subsys"] = section_copy # revert changes return True, File(file) + + def parse_output(self, stdout: str) -> dict: + return parse_output(stdout, self.outputs) \ No newline at end of file diff --git a/psiflow/reference/dummy.py b/psiflow/reference/dummy.py index eeb2baa..f31b14b 100644 --- a/psiflow/reference/dummy.py +++ b/psiflow/reference/dummy.py @@ -1,40 +1,39 @@ -from typing import Optional, Union from functools import partial import numpy as np -from parsl import File, bash_app, python_app +from parsl import File, bash_app import psiflow from psiflow.geometry import Geometry -from psiflow.reference.reference import Reference, Status -from psiflow.reference.reference import _execute, _process_output +from psiflow.reference.reference import Reference, Status, _execute -@psiflow.serializable +def make_bash_template() -> str: + template = psiflow.context().bash_template + return template.format(commands="cat {}", env="") + + +@psiflow.register_serializable class ReferenceDummy(Reference): - executor = "HTEX" + executor = "default_threads" _execute_label = "dummy_singlepoint" def __init__(self, **kwargs): super().__init__(**kwargs) - self._create_apps() - - def _create_apps(self): - # psiflow.context().definitions does not contain "default_htex" - self.execute_command = "" - self.app_execute = partial( - bash_app(_execute, executors=["default_htex"]), - reference=self, - parsl_resource_specification={}, + self.bash_template = make_bash_template() + + def get_execute_app(self): + # default_threads does not have an ExecutionDefinition + return partial( + bash_app(_execute, executors=[self.executor]), + bash_template=self.bash_template, label=self._execute_label, ) - self.app_post = partial( - python_app(_process_output, executors=["default_threads"]), - reference=self, - ) - def get_shell_command(self, inputs: list[File]) -> str: - return f"cat {inputs[0].filepath}" + def create_input(self, geom: Geometry) -> tuple[bool, File]: + with open(file := psiflow.context().new_file("dummy_", ".inp"), "w") as f: + f.write(geom.to_string()) + return True, File(file) def parse_output(self, stdout: str) -> dict: geom = Geometry.from_string(stdout) @@ -47,8 +46,3 @@ def parse_output(self, stdout: str) -> dict: if "forces" in self.outputs: data["forces"] = np.random.uniform(size=(len(geom), 3)) return data - - def create_input(self, geom: Geometry) -> tuple[bool, File]: - with open(file := psiflow.context().new_file("dummy_", ".inp"), "w") as f: - f.write(geom.to_string()) - return True, File(file) diff --git a/psiflow/reference/gpaw_.py b/psiflow/reference/gpaw_.py index e787abb..3c840a6 100644 --- a/psiflow/reference/gpaw_.py +++ b/psiflow/reference/gpaw_.py @@ -1,6 +1,7 @@ import json +import textwrap from pathlib import Path -from typing import Optional, Union, ClassVar +from collections.abc import Sequence from parsl import File from parsl.dataflow.futures import AppFuture @@ -9,17 +10,30 @@ from psiflow.geometry import Geometry from psiflow.reference.reference import Reference, Status from psiflow.utils.apps import copy_app_future -from psiflow.utils.parse import find_line from psiflow.reference._gpaw import FILEPATH, DEFAULTS, STDOUT_KEY +from psiflow.utils.parse import find_line, format_env_vars -def parse_output(stdout: str, properties: tuple[str, ...]) -> dict: +def make_bash_template(executor: str, script: str) -> str: + context = psiflow.context() + definition = context.definitions[executor] + command = f""" + cp {{}} input.json + cp {script} script_gpaw.py + {definition.command()} + """ + command = textwrap.dedent(command)[1:] + env = format_env_vars(definition.env_vars) + return context.bash_template.format(commands=command, env=env) + + +def parse_output(stdout: str, properties: Sequence[str]) -> dict: lines = stdout.split("\n") idx_start = find_line(lines, STDOUT_KEY) + 1 idx_stop = find_line(lines, STDOUT_KEY, idx_start) txt = "\n".join(lines[idx_start:idx_stop]) geom = Geometry.from_string(txt) - idx = find_line(lines, "Total:", reverse=True) + idx = find_line(lines, "Total:", reverse=True, max_lines=100) data = { "status": Status.SUCCESS, "runtime": lines[idx].split()[-2], @@ -32,9 +46,9 @@ def parse_output(stdout: str, properties: tuple[str, ...]) -> dict: return data -@psiflow.serializable +@psiflow.register_serializable class GPAW(Reference): - executor: ClassVar[str] = "GPAW" + executor: str = "GPAW" _execute_label = "gpaw_singlepoint" parameters: dict script: str @@ -44,19 +58,12 @@ def __init__(self, parameters: dict, script: str | Path = FILEPATH, **kwargs): self.parameters = parameters assert (script := Path(script)).is_file() self.script = str(script.resolve()) # absolute path - self._create_apps() + self.bash_template = make_bash_template(self.executor, self.script) + print(self.bash_template) def compute_atomic_energy(self, element, box_size=None) -> AppFuture: return copy_app_future(0.0) # GPAW computes formation energy by default - def get_shell_command(self, inputs: list[File]) -> str: - command_list = [ - f"cp {inputs[0].filepath} input.json", - f"cp {self.script} script_gpaw.py", - self.execute_command, - ] - return "\n".join(command_list) - def parse_output(self, stdout: str) -> dict: return parse_output(stdout, self.outputs) diff --git a/psiflow/reference/orca_.py b/psiflow/reference/orca_.py index cc2185b..93a67cc 100644 --- a/psiflow/reference/orca_.py +++ b/psiflow/reference/orca_.py @@ -1,7 +1,8 @@ import warnings import re from functools import partial -from typing import Optional, Union, ClassVar +from typing import Optional +from collections.abc import Sequence import ase.symbols import numpy as np @@ -12,7 +13,12 @@ import psiflow from psiflow.geometry import Geometry from psiflow.reference.reference import Reference, Status, get_spin_multiplicities -from psiflow.utils.parse import find_line, lines_to_array, str_to_timedelta +from psiflow.utils.parse import ( + find_line, + lines_to_array, + str_to_timedelta, + format_env_vars, +) KEY_GHOST = "ghost" @@ -59,7 +65,7 @@ def format_coord(geom: Geometry) -> str: return "\n".join(data) -def check_input(input_template: str, properties: tuple[str, ...]) -> str: +def check_input(input_template: str, properties: Sequence[str]) -> str: """""" research = partial(re.search, string=input_template, flags=re.IGNORECASE) @@ -83,7 +89,15 @@ def check_input(input_template: str, properties: tuple[str, ...]) -> str: return "\n".join(lines) -def parse_output(stdout: str, properties: tuple[str, ...]) -> dict: +def make_bash_template(executor: str) -> str: + context = psiflow.context() + definition = context.definitions[executor] + command = f"cp {{}} orca.inp\n{definition.command()}" + env = format_env_vars(definition.env_vars) + return context.bash_template.format(commands=command, env=env) + + +def parse_output(stdout: str, properties: Sequence[str]) -> dict: lines = stdout.split("\n") data = {} @@ -114,14 +128,14 @@ def parse_output(stdout: str, properties: tuple[str, ...]) -> dict: idx_start = idx = find_line(lines, "CARTESIAN GRADIENT", idx) + 3 idx_stop = find_line(lines, "Difference", idx) - 1 gradients = lines_to_array(lines[idx_start:idx_stop], start=3, stop=6) - data["forces"] = - gradients * Ha / Bohr + data["forces"] = -gradients * Ha / Bohr return data -@psiflow.serializable +@psiflow.register_serializable class ORCA(Reference): - executor: ClassVar[str] = "ORCA" + executor: str = "ORCA" _execute_label = "orca_singlepoint" input_template: str input_kwargs: dict @@ -129,13 +143,11 @@ class ORCA(Reference): def __init__(self, input_template: str, **kwargs): super().__init__(**kwargs) self.input_template = check_input(input_template, self.outputs) - self.input_kwargs = DEFAULT_KWARGS.copy() # TODO: user control? - self._create_apps() + self.bash_template = make_bash_template(self.executor) - def _create_apps(self): - super()._create_apps() + self.input_kwargs = DEFAULT_KWARGS.copy() # TODO: user control? definition = psiflow.context().definitions[self.executor] - wq_resources = definition.wq_resources() + wq_resources = definition.wq_resources(self.n_cores) cores, memory = wq_resources["cores"], wq_resources["memory"] self.input_kwargs |= {"cores": cores, "memory": memory // cores} # in MB @@ -155,10 +167,6 @@ def get_single_atom_references(self, element: str) -> dict[int, Reference]: references[mult] = ref return references - def get_shell_command(self, inputs: list[File]) -> str: - command_list = [f"cp {inputs[0].filepath} orca.inp", self.execute_command] - return "\n".join(command_list) - def parse_output(self, stdout: str) -> dict: return parse_output(stdout, self.outputs) diff --git a/psiflow/reference/reference.py b/psiflow/reference/reference.py index d049d9a..1a6d6d7 100644 --- a/psiflow/reference/reference.py +++ b/psiflow/reference/reference.py @@ -1,8 +1,7 @@ -from __future__ import annotations # necessary for type-guarding class methods - import warnings import logging -from typing import ClassVar, Optional, Union, Callable, Sequence +from typing import Optional, Union, Callable, Optional +from collections.abc import Sequence from pathlib import Path from functools import partial from enum import Enum @@ -14,7 +13,7 @@ from parsl.dataflow.futures import AppFuture import psiflow -from psiflow.data import Computable, Dataset +from psiflow.data import Dataset from psiflow.data.utils import extract_quantities from psiflow.geometry import Geometry, NullState from psiflow.utils.apps import copy_app_future @@ -30,21 +29,9 @@ class Status(Enum): INCONSISTENT = 2 -class SinglePointResult: - """All dict keys update_geometry understands""" - - status: Status - natoms: int # optional - positions: np.ndarray - energy: float - forces: np.ndarray # optional - stdout: Path - stderr: Path # optional - runtime: float # optional - - def update_geometry(geom: Geometry, data: dict) -> Geometry: """""" + # data should contain 'stdout' and 'status' keys task_name, task_id = get_task_name_id(data["stdout"]) logger.info(f'Task "{task_name}" (ID {task_id}): {data["status"].name}') @@ -53,7 +40,9 @@ def update_geometry(geom: Geometry, data: dict) -> Geometry: geom.order["status"], geom.order["task_id"] = data["status"].name, task_id if data["status"] != Status.SUCCESS: return geom - geom.order["runtime"] = data.get("runtime") + + # necessary keys: 'positions' and 'energy' + # optional keys: 'runtime' and 'forces' shift = data["positions"][0] - geom.per_atom.positions[0] if not np.allclose(data["positions"], geom.per_atom.positions + shift, atol=1e-6): @@ -62,6 +51,7 @@ def update_geometry(geom: Geometry, data: dict) -> Geometry: return geom geom.energy = data["energy"] + geom.order["runtime"] = data.get("runtime") if "forces" in data: geom.per_atom.forces[:] = data["forces"] return geom @@ -71,19 +61,34 @@ def update_geometry(geom: Geometry, data: dict) -> Geometry: def get_minimum_energy(element: str, **kwargs) -> AppFuture[float]: energies = {m: state.energy or np.inf for m, state in kwargs.items()} energy = min(energies.values()) - - logger.info(f"Atomic energies for element {element}") - for m, energy in energies.items(): - logger.info(f"\tMultiplicity {m}:{energy:>10.4f} eV") + msg = [ + f"\nAtomic energies for element {element}", + *[f"\tMultiplicity {m}:{energy:>10.4f} eV" for m, energy in energies.items()], + ] + logger.info('\n'.join(msg)) assert not np.isinf(energy), f"Atomic energy calculation of '{element}' failed" - return copy_app_future(energy) + return copy_app_future(energy) # TODO: why? + + +def get_spin_multiplicities(element: str) -> list[int]: + """TODO: rethink this""" + # max S = N * 1/2, max mult = 2 * S + 1 + from ase.symbols import atomic_numbers + + mults = [] + number = atomic_numbers[element] + for mult in range(1, min(number + 2, 16)): + if number % 2 == 0 and mult % 2 == 0: + continue # S always whole, mult never even + mults.append(mult) + return mults @join_app def compute_dataset( dataset: Dataset, length: int, - reference: Reference, + reference: "Reference", ) -> list[AppFuture[Geometry]]: logger.info(f"Performing {length} {reference.__class__.__name__} calculations.") geometries = dataset.geometries() # read it once @@ -92,73 +97,40 @@ def compute_dataset( def _execute( - reference: Reference, - inputs: list[File], bash_template: str, + inputs: list[File], parsl_resource_specification: Optional[dict] = None, stdout: str = parsl.AUTO_LOGNAME, stderr: str = parsl.AUTO_LOGNAME, label: str = "singlepoint", ) -> str: - # TODO: we do not set env_vars here? - command = reference.get_shell_command(inputs) - return bash_template.format(commands=command, env=">/dev/null") - - -def _process_output( - reference: Reference, - geom: Geometry, - inputs: tuple[str | int] = (), -) -> Geometry: - """""" - stdout = Path(inputs[0]).read_text() - try: - data = reference.parse_output(stdout) - except LineNotFoundError: - # TODO: find out what went wrong? - data = {"status": Status.FAILED} - data |= {"stdout": Path(inputs[0]), "stderr": Path(inputs[1])} - return update_geometry(geom, data) - - -@join_app -def evaluate(reference: Reference, geom: Geometry) -> AppFuture[Geometry]: - """""" - if geom == NullState: # TODO: remove this - warnings.warn("Skipping NullState..") - return copy_app_future(geom) - execute, *files = reference.create_input(geom=geom) - if not execute: # TODO: should we reset geom? - return copy_app_future(geom) - future = reference.app_execute(inputs=files) - future = reference.app_post( - geom=geom, inputs=[future.stdout, future.stderr, future] # wait for future - ) - return future + return bash_template.format(*inputs) -@psiflow.serializable -class Reference(Computable): - outputs: Sequence[str] - batch_size: ClassVar[int] = 1 # TODO: not really used - app_execute: ClassVar[Callable] # TODO: fix serialisation - app_post: ClassVar[Callable] - _execute_label: ClassVar[str] - execute_command: str - executor: ClassVar[str] +class Reference: + outputs: tuple[str, ...] + bash_template: str + _execute_label: str + executor: str n_cores: Optional[int] def __init__( self, outputs: Sequence[str] = ("energy", "forces"), n_cores: int | None = None ): - self.outputs: tuple[str, ...] = tuple(outputs) + self.outputs = tuple(outputs) self.n_cores = n_cores + context = psiflow.context() + definition = context.definitions[self.executor] + if (n := self.n_cores) is not None: + assert n <= definition.spec["cores"] + def compute( self, arg: Union[Dataset, Geometry, AppFuture, list], *outputs: Optional[Union[str, tuple]], ): + # TODO: deprecate? Reference evaluations are too costly for output in outputs: if output not in self.outputs: raise ValueError("output {} not in {}".format(output, self.outputs)) @@ -184,31 +156,6 @@ def compute( return future_data[0] return [future_data[_] for _ in range(len(outputs))] - def compute_dataset(self, dataset: Dataset) -> Dataset: - future = compute_dataset(dataset, dataset.length(), self) - return Dataset(future) - - def _create_apps(self): - context = psiflow.context() - definition = context.definitions[self.executor] - if (n := self.n_cores) is not None: - assert n <= definition.spec["cores"] - self.execute_command = definition.command() - self.app_execute = partial( - bash_app(_execute, executors=[self.executor]), - reference=self, - bash_template=context.bash_template, - parsl_resource_specification=definition.wq_resources(n), - label=self._execute_label, - ) - self.app_post = partial( - python_app(_process_output, executors=["default_threads"]), - reference=self, - ) - - def evaluate(self, geometry: Geometry | AppFuture) -> AppFuture: - return evaluate(self, geometry) - def compute_atomic_energy(self, element, box_size=None) -> AppFuture[float]: references = self.get_single_atom_references(element) state = Geometry.from_data( @@ -221,28 +168,75 @@ def compute_atomic_energy(self, element, box_size=None) -> AppFuture[float]: futures[str(mult)] = reference.evaluate(state) return get_minimum_energy(element, **futures) - def get_single_atom_references(self, element: str) -> dict[int, Reference]: - raise NotImplementedError + def get_execute_app(self) -> Callable: + # TODO: how often is this called? + context = psiflow.context() + definition = context.definitions[self.executor] + return partial( + bash_app(_execute, executors=[self.executor]), + bash_template=self.bash_template, + parsl_resource_specification=definition.wq_resources(self.n_cores), + label=self._execute_label, + ) - def get_shell_command(self, inputs: list[File]) -> str: - raise NotImplementedError + def evaluate(self, geometry: Geometry | AppFuture) -> AppFuture: + return evaluate(geometry, self) + + def compute_dataset(self, dataset: Dataset) -> Dataset: + future = compute_dataset(dataset, dataset.length(), self) + return Dataset(future) - def parse_output(self, stdout: str): + def get_single_atom_references(self, element: str) -> dict[int, "Reference"]: raise NotImplementedError def create_input(self, geom: Geometry) -> tuple[bool, File, ...]: raise NotImplementedError + def parse_output(self, stdout: str) -> dict: + raise NotImplementedError -def get_spin_multiplicities(element: str) -> list[int]: - """TODO: rethink this""" - # max S = N * 1/2, max mult = 2 * S + 1 - from ase.symbols import atomic_numbers - mults = [] - number = atomic_numbers[element] - for mult in range(1, min(number + 2, 16)): - if number % 2 == 0 and mult % 2 == 0: - continue # S always whole, mult never even - mults.append(mult) - return mults +def _process_output( + geom: Geometry, + reference: Reference, + inputs: Sequence[File] = (), +) -> Geometry: + """Updates geometry with ab initio labels""" + stdout = Path(inputs[0]).read_text() + try: + data = reference.parse_output(stdout) + except LineNotFoundError: + data = {"status": Status.FAILED} # TODO: find out what went wrong? + data |= {"stdout": Path(inputs[0]), "stderr": Path(inputs[1])} + return update_geometry(geom, data) + + +process_output = python_app(_process_output, executors=["default_threads"]) + + +@join_app +def evaluate( + geom: Geometry, + reference: Reference, + execute_func: Optional[Callable] = None, +) -> AppFuture: + """""" + if geom == NullState: # TODO: remove this + warnings.warn("Skipping NullState..") + return copy_app_future(geom) + + flag, *files = reference.create_input(geom=geom) + if not flag: # TODO: should we reset geom? + return copy_app_future(geom) + + # do the actual evaluation + if execute_func is None: + execute_func = reference.get_execute_app() + future = execute_func(inputs=files) + + future = process_output( + geom=geom, + reference=reference, + inputs=[future.stdout, future.stderr, future], # wait for future + ) + return future diff --git a/psiflow/sampling/metadynamics.py b/psiflow/sampling/metadynamics.py index 3635264..6411b29 100644 --- a/psiflow/sampling/metadynamics.py +++ b/psiflow/sampling/metadynamics.py @@ -9,7 +9,7 @@ from psiflow.utils.apps import copy_app_future, copy_data_future -@psiflow.serializable +@psiflow.register_serializable class Metadynamics: _plumed_input: str external: Optional[psiflow._DataFuture] diff --git a/psiflow/sampling/output.py b/psiflow/sampling/output.py index 7d74aec..9daec95 100644 --- a/psiflow/sampling/output.py +++ b/psiflow/sampling/output.py @@ -147,16 +147,16 @@ def _add_contributions( add_contributions = python_app(_add_contributions, executors=["default_threads"]) -@psiflow.serializable +@psiflow.register_serializable @dataclass class SimulationOutput: task_id: str walker: Walker - status: Union[Status, AppFuture] - state: Union[Geometry, AppFuture] + status: Status | AppFuture + state: Geometry | AppFuture data: AppFuture - time: Union[float, AppFuture] - temperature: Union[float, AppFuture] + time: float | AppFuture + temperature: float | AppFuture trajectory: Optional[Dataset] observables: list[str] diff --git a/psiflow/sampling/walker.py b/psiflow/sampling/walker.py index 42c711d..d26d243 100644 --- a/psiflow/sampling/walker.py +++ b/psiflow/sampling/walker.py @@ -65,10 +65,10 @@ def get_ensemble_kwargs(walker: "Walker") -> dict: ) -@psiflow.serializable +@psiflow.register_serializable @dataclass class Walker: - start: Union[Geometry, AppFuture] + start: Geometry | AppFuture hamiltonian: Hamiltonian = field(default_factory=lambda: Zero()) timestep: float = 0.5 temperature: Optional[float] = 300 @@ -231,7 +231,7 @@ def validate_coupling(walkers: list[Walker]): assert coupling.nwalkers == counts[i] -@psiflow.serializable +@psiflow.register_serializable class ReplicaExchange(Coupling): trial_frequency: int rescale_kinetic: bool diff --git a/psiflow/serialization.py b/psiflow/serialization.py index ec455b0..ffe28f4 100644 --- a/psiflow/serialization.py +++ b/psiflow/serialization.py @@ -1,340 +1,131 @@ -from __future__ import annotations # necessary for type-guarding class methods - -import inspect +import shutil import json from pathlib import Path -from typing import ClassVar, Optional, Union, get_args, get_origin, get_type_hints +from typing import Any, Optional, Union from collections.abc import Sequence -from dataclasses import InitVar -import typeguard -from parsl.app.app import python_app -from parsl.app.futures import DataFuture -from parsl.data_provider.files import File -from parsl.dataflow.futures import AppFuture +from parsl import File, python_app +from parsl.dataflow.futures import AppFuture, DataFuture import psiflow from psiflow.geometry import Geometry +from psiflow.utils.future import resolve_nested_futures -# TODO: this is only used in the Learning class currently - - -_DataFuture = Union[File, DataFuture] - - -class Serializable: - pass - - -def dummy(*args, **kwargs): - return None - - -def create_getter(name, kind, type_hint): - @typeguard.typechecked - def getter(self) -> type_hint: - return getattr(self, "_{}".format(kind))[name] - - return getter +# TODO: verify which attributes need to be serialized with a _to_serialize key? -def create_setter(name, kind, type_hint): - @typeguard.typechecked - def setter(self, value: type_hint) -> None: - _dict = getattr(self, "_{}".format(kind)) - _dict[name] = value +CLS_KEY = "PSIFLOW_CLS" +SKIP_INIT = "SKIP_INIT" +SERIALIZABLE_CLS = {} - return setter +_DataFuture = Union[File, DataFuture] # TODO: does not belong here -def update_init(init_func): # TODO: why not have a Mixin class instead? - def wrapper(self, *args, **kwargs): - self._geoms = {} - self._files = {} - self._attrs = {} - self._serial = {} - return init_func(self, *args, **kwargs) - - return wrapper - - -def serializable(cls): - """decorator to make class serializable""" - class_dict = dict(cls.__dict__) - for name, type_hint in get_type_hints(cls).items(): - kind = None - if get_origin(type_hint) in [Union, Optional, list, tuple]: - args = get_args(type_hint) - if (File in args) or (DataFuture in args): - kind = "files" - else: - if Geometry in args: - kind = "geoms" - else: - kind = "attrs" - for arg in args: - if inspect.isclass(arg): - if issubclass(arg, Serializable): # weird - kind = "serial" - else: - # TODO: temporary hotfixes all around - origin = get_origin(type_hint) - if origin is ClassVar: - continue # do nothing for classvars - elif origin in (dict, Sequence): - kind = "attrs" - elif isinstance(type_hint, str) and type_hint.startswith("dataclasses"): - continue - elif isinstance(type_hint, InitVar): - continue - - elif kind is not None and not inspect.isclass(type_hint): - raise ValueError( - "{} is formally not a class ({})".format(type_hint, name) - ) - elif issubclass(type_hint, Serializable): - kind = "serial" - elif type_hint is Geometry: - kind = "geoms" - else: - kind = "attrs" - getter = create_getter(name, kind, type_hint) - setter = create_setter(name, kind, type_hint) - class_dict[name] = property(getter, setter) - - if "__init__" not in class_dict: - class_dict["__init__"] = dummy - class_dict["__init__"] = update_init( - class_dict["__init__"] - ) # create _attrs / _files / _serial - - bases = cls.__mro__ - if bases is not None: - if Serializable in bases: - pass - else: - bases = (Serializable,) + bases - else: - bases = (Serializable,) - new_cls = type( - cls.__name__, - bases, - class_dict, - ) - return new_cls - - -@typeguard.typechecked -def _dump_json( - inputs: list = [], - outputs: list = [], - **kwargs, -) -> str: - import numpy as np - from parsl.dataflow.futures import AppFuture - - def convert_to_list(array): - if not type(array) is np.ndarray: - return array - as_list = [] - for item in array: - as_list.append(convert_to_list(item)) - return as_list - - def descend_and_wait(value): - if type(value) in [AppFuture, DataFuture]: - value = value.result() - if type(value) is dict: - for key in list(value.keys()): - value[key] = descend_and_wait(value[key]) - if type(value) in [list]: # do not allow futures in tuples! - for i in range(len(value)): - value[i] = descend_and_wait(value[i]) - return value - - # descend_and_wait(kwargs) - # print(kwargs) - # for name, value in kwargs.items(): - # print(name, type(value)) - - kwargs = descend_and_wait(kwargs) - - for name in list(kwargs.keys()): - value = kwargs[name] - if type(value) is np.ndarray: - value = convert_to_list(value) - kwargs[name] = value - - s = json.dumps(kwargs) - if len(outputs) > 0: - with open(outputs[0], "w") as f: - f.write(s) - return s +class SerializationError(TypeError): + pass -dump_json = python_app(_dump_json, executors=["default_threads"]) +def register_serializable(cls): + SERIALIZABLE_CLS[cls.__name__] = cls + return cls -@typeguard.typechecked def serialize( - obj: Serializable, - path_json: Optional[Path] = None, - copy_to: Optional[Path] = None, + obj: Any, path_json: Optional[Path] = None, copy_to: Optional[Path] = None ) -> AppFuture: - from psiflow.utils.apps import copy_data_future - + """JSON serialization for psiflow classes""" + outputs = [] if path_json is not None: - path_json = psiflow.resolve_and_check(path_json) - data = { - "_attrs": dict(obj._attrs), - } + outputs = [File(psiflow.resolve_and_check(path_json))] + future = resolve_nested_futures(obj) + return serialize_object(future, copy_to, outputs=outputs) - # dump_json waits for all futures in this list - inputs = ( - list(obj._attrs.values()) - + list(obj._files.values()) - # + list(obj._serial.values()) - # + list(obj._geoms.values()) - ) - # populate _files dict; - # if data futures need to be copied, this adds the copy operation to inputs - _files = {} - if copy_to is None: - for key, _file in obj._files.items(): - if _file is None: - _files[key] = None - continue - _files[key] = _file.filepath - else: - copy_to.mkdir(exist_ok=True) - for key, _file in obj._files.items(): - if _file is None: - _files[key] = None - continue - new_path = copy_to / Path(_file.filepath).name - new_file = copy_data_future( - pass_on_exist=True, # e.g. identical hamiltonians in different walkers - inputs=[_file], - outputs=[File(new_path)], - ).outputs[0] - _files[key] = new_file.filepath - inputs.append(new_file) - data["_files"] = _files +def _deserialize(json_str: str) -> Any: + """Reconstruct a psiflow object""" - # populate _serial dict; - # adds result of sub serialize calls to inputs - _serial = {} - for name, serial in obj._serial.items(): - if serial is None: # optional types - _serial[name] = None - continue - if type(serial) in [list, tuple]: - serialized = [serialize(s, path_json=None, copy_to=copy_to) for s in serial] - inputs += serialized - else: - serialized = serialize(serial, path_json=None, copy_to=copy_to) - inputs.append(serialized) - _serial[name] = serialized - data["_serial"] = _serial + return json.loads(json_str, object_hook=deserialize_hook) - # populate _geoms dict: - # generate Geometry and AppFuture[Geometry] strings - @python_app(executors=["default_threads"]) - def to_string(geometry: Optional[Geometry]) -> str: - if geometry is None: - return "" - else: - return geometry.to_string() - _geoms = {} - for key, value in obj._geoms.items(): - _geoms[key] = to_string(value) - data["_geoms"] = _geoms - inputs += list(_geoms.values()) +deserialize = python_app(_deserialize, executors=["default_threads"]) - if path_json is not None: - outputs = [File(str(path_json))] - else: - outputs = [] - return dump_json( - **{obj.__class__.__name__: data}, - inputs=inputs, - outputs=outputs, - ) +def _serialize_object( + obj: Any, + copy_to: Optional[Path], + outputs: Sequence[File], +) -> str: + """Serialize a psiflow object. It should not contain any futures.""" + try: + json_str = json.dumps(obj, cls=JSONEncoder, copy_to=copy_to) + except TypeError as e: + cls_set = set(SERIALIZABLE_CLS.keys()) or {} + msg = f"Failed to serialize with error '{e}'. Is it an instance of {cls_set}?" + raise SerializationError(msg) + if outputs: + with open(outputs[0], "w") as f: + f.write(json_str) + return json_str + + +serialize_object = python_app(_serialize_object, executors=["default_threads"]) + + +class JSONEncoder(json.JSONEncoder): + def __init__(self, *args, **kwargs): + self.copy_to = copy_to = kwargs.pop("copy_to", None) + if copy_to is not None: + copy_to.mkdir(exist_ok=True, parents=True) + super().__init__(*args, **kwargs) + + def default(self, obj: Any) -> dict[str, Any]: + """How to handle special class instances""" + name = obj.__class__.__name__ + match obj: + case File(): + return self._handle_file(obj) + case Geometry(): + return {CLS_KEY: "Geometry", "data": obj.to_string()} + case _ if name in SERIALIZABLE_CLS: # class instances + return {CLS_KEY: name} | vars(obj) + case _ if obj in SERIALIZABLE_CLS.values(): # classes + return {CLS_KEY: obj.__name__} | {SKIP_INIT: True} + case _: + return super().default(obj) # fall back to default behaviour + + def _handle_file(self, obj: File) -> dict[str, str]: + if self.copy_to is None: + path = obj.filepath + else: + path = self.copy_to / obj.filename + if path.is_file(): + pass # e.g. identical hamiltonians in different walkers + else: + shutil.copy(obj.filepath, path) + return {CLS_KEY: "File", "path": str(path)} -@typeguard.typechecked -def deserialize(data_str: str, custom_cls: Optional[list] = None): - from psiflow.data import Dataset - from psiflow.hamiltonians import ( - EinsteinCrystal, - Harmonic, - MACEHamiltonian, - MixtureHamiltonian, - PlumedHamiltonian, - Zero, - ) - from psiflow.learning import Learning - from psiflow.metrics import Metrics - from psiflow.models import MACE - # from psiflow.order_parameters import OrderParameter - from psiflow.reference import CP2K, GPAW, ORCA, ReferenceDummy - from psiflow.sampling import Metadynamics, ReplicaExchange, SimulationOutput, Walker +def deserialize_hook(data: dict) -> Any: + """Reconstruct psiflow objects. Let JSON handle the rest.""" + if CLS_KEY not in data: + return data + cls_name = data.pop(CLS_KEY) - SERIALIZABLES = {} - if custom_cls is None: - custom_cls = [] - for cls in custom_cls + [ - Dataset, - MACE, - CP2K, - GPAW, - ORCA, - ReferenceDummy, - Zero, - MACEHamiltonian, - EinsteinCrystal, - PlumedHamiltonian, - Harmonic, - MixtureHamiltonian, - Metadynamics, - # OrderParameter, - ReplicaExchange, - SimulationOutput, - Walker, - Metrics, - Learning, - ]: - SERIALIZABLES[cls.__name__] = cls + if cls_name == "File": + return File(Path(data["path"])) + elif cls_name == "Geometry": + return Geometry.from_string(data["data"]) + elif cls_name not in SERIALIZABLE_CLS: + cls_set = set(SERIALIZABLE_CLS.keys()) or {} + msg = f"Custom class '{cls_name}' not in {cls_set}. Cannot deserialize.." + raise TypeError(msg) - data = json.loads(data_str) - cls_name = list(data.keys())[0] - cls = SERIALIZABLES.get(cls_name, None) - assert cls is not None + cls = SERIALIZABLE_CLS.get(cls_name) + if data.get(SKIP_INIT): + return cls # return class instead of instance obj = cls.__new__(cls) - obj._files = {} - for key, value in data[cls_name]["_files"].items(): - if value is not None: - value = File(value) - obj._files[key] = value - obj._attrs = data[cls_name]["_attrs"] - obj._geoms = { - k: Geometry.from_string(s, natoms=None) - for k, s in data[cls_name]["_geoms"].items() - } - _serial = {} - for key, value in data[cls_name]["_serial"].items(): - if value is None: - _serial[key] = value - elif type(value) in [list, tuple]: - _serial[key] = [deserialize(v, custom_cls=custom_cls) for v in value] - else: - _serial[key] = deserialize(value, custom_cls=custom_cls) - obj._serial = _serial - if hasattr(obj, "_create_apps"): - obj._create_apps() + for k, v in data.items(): + setattr(obj, k, v) return obj diff --git a/psiflow/utils/future.py b/psiflow/utils/future.py new file mode 100644 index 0000000..1da70e5 --- /dev/null +++ b/psiflow/utils/future.py @@ -0,0 +1,67 @@ +import copy +from functools import partial +from typing import Any, Callable +from collections.abc import Sequence + +from parsl import File, python_app +from parsl.dataflow.futures import AppFuture, Future + +from psiflow.geometry import Geometry + + +CLS_TO_IGNORE = (Future, Callable, File, Geometry) + + +def traverse(obj: Any, callback: Callable) -> Any: + """ + Recursively traverses object and applies callback to every element. + Returns the (potentially modified) structure. + """ + node = callback(obj) + match obj: + case _ if isinstance(obj, CLS_TO_IGNORE): + pass # do not inspect certain classes + case dict(): + return {k: traverse(v, callback) for k, v in node.items()} + case list() | tuple() | set(): + return type(obj)(traverse(v, callback) for v in node) + case object(__dict__=data): + # filters for classes with attributes - will not work with slots + for k, v in data.items(): + setattr(node, k, traverse(v, callback)) + return node + + +def extract_futures(obj: Any) -> list[AppFuture]: + """Find all futures nested in the data tree""" + + def store(obj: Any) -> Any: + if isinstance(obj, Future): + futures.append(obj) + return obj + + futures = [] + traverse(obj, store) + return futures + + +@python_app(executors=['default_threads']) +def resolve_futures(obj: Any, inputs: Sequence = ()) -> Any: + """Replace every nested future with its result. This is blocking, so make sure they are finished.""" + + def resolve(obj: Any) -> Any: + if isinstance(obj, Future): + return obj.result() + return copy.copy(obj) # avoid weird side effects + + if not inputs: + return obj # quickly return if there are no futures + return traverse(obj, resolve) + + +def resolve_nested_futures(obj: Any) -> AppFuture: + """Resolve every nested future with its result for (mostly) arbitrary objects. + Essentially performs list[Future | Any] -> Future[list[Any]] for complex datastructures. + """ + futures = extract_futures(obj) + return resolve_futures(obj, inputs=futures) diff --git a/psiflow/utils/logging.py b/psiflow/utils/logging.py index 373a613..8ef1998 100644 --- a/psiflow/utils/logging.py +++ b/psiflow/utils/logging.py @@ -12,7 +12,7 @@ def setup_logging(file: Path, level=logging.INFO) -> None: fh = logging.FileHandler(file) formatter = logging.Formatter( - fmt='%(asctime)s [%(levelname)s] %(name)s \t %(message)s', + fmt='%(asctime)s [%(levelname)s] %(name)s - %(message)s', datefmt='%Y-%m-%d %H:%M' ) fh.setFormatter(formatter) diff --git a/psiflow/utils/parse.py b/psiflow/utils/parse.py index 9f265d1..1131efe 100644 --- a/psiflow/utils/parse.py +++ b/psiflow/utils/parse.py @@ -70,6 +70,6 @@ def get_task_name_id(logfile: str) -> tuple[str, str]: def format_env_vars(env_vars: dict) -> str: if len(env_vars) == 0: return "" - return "export" + " ".join([f"{k}={v}" for k, v in env_vars.items()]) + return "export " + " ".join([f"{k}={v}" for k, v in env_vars.items()]) diff --git a/pyproject.toml b/pyproject.toml index 61df7a5..52d8158 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] name = "psiflow" -version = "4.0.2" +version = "4.0.3" description = "Library for developing interatomic potentials" readme = "README.md" requires-python = ">=3.10" diff --git a/tests/test_function.py b/tests/test_function.py index e9bcacf..19c15b9 100644 --- a/tests/test_function.py +++ b/tests/test_function.py @@ -5,13 +5,11 @@ from parsl.data_provider.files import File # type: ignore import psiflow -from psiflow.data import Dataset from psiflow.functions import ( EinsteinCrystalFunction, HarmonicFunction, MACEFunction, PlumedFunction, - DispersionFunction, function_from_json, ) from psiflow.hamiltonians import ( @@ -25,6 +23,7 @@ from psiflow.utils._plumed import remove_comments_printflush, set_path_in_plumed from psiflow.utils.apps import copy_app_future from psiflow.utils.io import dump_json +from psiflow.serialization import CLS_KEY def test_einstein_crystal(dataset): @@ -299,39 +298,33 @@ def test_hamiltonian_serialize(dataset): ) plumed = PlumedHamiltonian(plumed_input) data = json.loads(psiflow.serialize(einstein).result()) - assert "EinsteinCrystal" in data - assert "reference_geometry" in data["EinsteinCrystal"]["_geoms"] - einstein_ = psiflow.deserialize(json.dumps(data)) - assert np.allclose( - einstein.compute(dataset[:10], "energy").result(), - einstein_.compute(dataset[:10], "energy", batch_size=3).result(), - ) + assert data[CLS_KEY] == "EinsteinCrystal" + assert "reference_geometry" in data + einstein_ = psiflow.deserialize(json.dumps(data)).result() + energy = einstein.compute(dataset[:10], "energy") + energy_ = einstein_.compute(dataset[:10], "energy") + assert np.allclose(energy.result(), energy_.result()) mixed = 0.1 * einstein + 0.9 * plumed - assert "hamiltonians" in mixed._serial - assert "coefficients" in mixed._attrs data = json.loads(psiflow.serialize(mixed).result()) - assert "MixtureHamiltonian" in data - assert "hamiltonians" in data["MixtureHamiltonian"]["_serial"] - mixed_ = psiflow.deserialize(json.dumps(data)) - for i, h in enumerate(mixed.hamiltonians): + assert data[CLS_KEY] == "MixtureHamiltonian" + assert "hamiltonians" in data + assert "coefficients" in data + mixed_ = psiflow.deserialize(json.dumps(data)).result() + for h, h_ in zip(mixed.hamiltonians, mixed_.hamiltonians): if isinstance(h, EinsteinCrystal): - assert h.force_constant == mixed_.hamiltonians[i].force_constant - assert ( - h.reference_geometry.result() - == mixed_.hamiltonians[i].reference_geometry - ) + assert h.force_constant == h_.force_constant + assert h.reference_geometry.result() == h_.reference_geometry else: - assert h == mixed_.hamiltonians[i] - assert mixed.coefficients[i] == mixed_.coefficients[i] - assert np.allclose( - mixed.compute(dataset[:10], "energy").result(), - mixed_.compute(dataset[:10], "energy").result(), - ) + assert h == h_ + assert mixed.coefficients == mixed_.coefficients + energy = mixed.compute(dataset[:10], "energy") + energy_ = mixed_.compute(dataset[:10], "energy") + assert np.allclose(energy.result(), energy_.result()) data = json.loads(psiflow.serialize(Zero()).result()) - assert "Zero" in data - zero = psiflow.deserialize(json.dumps(data)) + assert data[CLS_KEY] == "Zero" + zero = psiflow.deserialize(json.dumps(data)).result() assert isinstance(zero, Zero) diff --git a/tests/test_models.py b/tests/test_models.py index 04275e4..831c0d0 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -11,7 +11,6 @@ def test_mace_init(mace_config, dataset): model = MACE(**mace_config) - assert "model_future" in model._files assert model.model_future is None model.initialize(dataset[:1]) assert model.model_future is not None @@ -19,12 +18,14 @@ def test_mace_init(mace_config, dataset): _config = model._config data_str = psiflow.serialize(model).result() - model = psiflow.deserialize(data_str) + model = psiflow.deserialize(data_str).result() _config_ = model._config for key, value in _config.items(): assert key in _config_ - if type(value) is not list: + if key in ("train_file", "valid_file"): + pass # these are updated by the apps + elif type(value) is not list: assert value == _config_[key] config = copy.deepcopy(mace_config) @@ -54,10 +55,10 @@ def test_mace_init(mace_config, dataset): for i in range(nstates): assert np.allclose(energies[i], evaluated[i].energy) - energies = hamiltonian.compute(dataset, "energy").result() - second = psiflow.deserialize(psiflow.serialize(hamiltonian).result()) - energies_ = second.compute(dataset, "energy").result() - assert np.allclose(energies, energies_) + second = psiflow.deserialize(psiflow.serialize(hamiltonian)).result() + energies = hamiltonian.compute(dataset, "energy") + energies_ = second.compute(dataset, "energy") + assert np.allclose(energies.result(), energies_.result()) hamiltonian = model.create_hamiltonian() model.reset() diff --git a/tests/test_reference.py b/tests/test_reference.py index a1be636..416974a 100644 --- a/tests/test_reference.py +++ b/tests/test_reference.py @@ -343,14 +343,10 @@ def test_cp2k_atomic_energies(simple_cp2k_input): def test_cp2k_serialize(simple_cp2k_input): element = "H" reference = CP2K(simple_cp2k_input, outputs=("energy",)) - assert "outputs" in reference._attrs - assert "input_dict" in reference._attrs - data = psiflow.serialize(reference).result() - reference2 = psiflow.deserialize(data) + reference2 = psiflow.deserialize(data).result() future = reference.compute_atomic_energy(element, box_size=4) future2 = reference2.compute_atomic_energy(element, box_size=4) - assert type(reference2.outputs) is list assert np.allclose(future.result(), future2.result()) diff --git a/tests/test_sampling.py b/tests/test_sampling.py index a502a6a..4a0718a 100644 --- a/tests/test_sampling.py +++ b/tests/test_sampling.py @@ -414,7 +414,7 @@ def test_walker_serialization(dataset, tmp_path): assert len(f.read()) > 0 data = [psiflow.serialize(obj, copy_to=tmp_path) for obj in walkers] - new_objects = [psiflow.deserialize(d.result()) for d in data] + new_objects = [psiflow.deserialize(d).result() for d in data] psiflow.wait() for d in data: print(d.result()) diff --git a/tests/test_serialization.py b/tests/test_serialization.py index cae19c4..cd8d225 100644 --- a/tests/test_serialization.py +++ b/tests/test_serialization.py @@ -1,113 +1,113 @@ -import json -import os from pathlib import Path -from typing import Optional, Union +from typing import Optional, Any +from dataclasses import dataclass import pytest -import typeguard -from parsl.data_provider.files import File -from parsl.dataflow.futures import AppFuture +from parsl import File +from parsl.dataflow.futures import Future import psiflow -from psiflow.data import Dataset -from psiflow.geometry import Geometry, NullState, new_nullstate -from psiflow.utils.apps import copy_app_future +from psiflow.geometry import Geometry +from psiflow.serialization import ( + SERIALIZABLE_CLS, + register_serializable, + SerializationError, +) +from psiflow.utils.future import resolve_nested_futures + + +@register_serializable +@dataclass +class Custom1: + foo: Any + bar: list[Any] + baz: dict[str, Any] + bam: Future + boa: Optional[Geometry] = None + + +@register_serializable +class Custom2(Custom1): + pass # just to mix classes + + +@dataclass +class Custom3: # not marked as serializable + foo: Any + + +def wrap_in_future(obj: Any) -> Future[Any]: + future = Future() + future.set_result(obj) + return future + + +def test_resolve_futures(): + """Verify that resolve_nested_futures works as expected.""" + instance = Custom1( + foo=Custom3(wrap_in_future(42)), + bar=[42, None, File(""), wrap_in_future(42)], + baz={"1": "blankets", "2": None, "3": File(""), "4": wrap_in_future(42)}, + bam=wrap_in_future(Custom2), + ) + out = resolve_nested_futures(instance).result() + + # check for side effects + futures = [instance.bam, instance.bar[-1], instance.baz["4"], instance.foo.foo] + assert all(isinstance(f, Future) for f in futures) + assert isinstance(instance.bam, Future) + assert isinstance(instance.bar[-1], Future) + assert isinstance(instance.baz["4"], Future) + assert isinstance(instance.foo.foo, Future) + assert instance.bar[0] == 42 and instance.baz["1"] == "blankets" + + # check resolved futures + assert out.bam == Custom2 + assert out.bar[-1] == 42 + assert out.baz["4"] == 42 + assert out.foo.foo == 42 + assert out.bar[0] == 42 and out.baz["1"] == "blankets" def test_serial_simple(tmp_path): - @psiflow.serializable - class SomeSerial: - pass - - @typeguard.typechecked - class Test: - foo: int - bar: psiflow._DataFuture - baz: Union[float, str] - bam: Optional[SomeSerial] - bao: SomeSerial - bap: list[SomeSerial, ...] - baq: Union[Geometry, AppFuture] - bas: Geometry - - def __init__(self, **kwargs): - for key, value in kwargs.items(): - setattr(self, key, value) - - new_cls = psiflow.serializable(Test) - instance = new_cls( - foo=3, - bar=File("asdfl"), - baz="asdflk", - bam=None, - bao=SomeSerial(), - bap=[SomeSerial(), SomeSerial()], - baq=copy_app_future(NullState), - bas=new_nullstate(), + """""" + assert "Custom1" in SERIALIZABLE_CLS + assert "Custom2" in SERIALIZABLE_CLS + assert "Custom3" not in SERIALIZABLE_CLS + + file1 = File(tmp_path / "1.log") + file2 = File(tmp_path / "2.log") + Path(file1.filepath).touch() + Path(file2.filepath).touch() + instance = Custom1( + foo=Custom3(wrap_in_future(42)), + bar=[42, None, file1, wrap_in_future(42)], + baz={"1": "blankets", "2": None, "3": file2, "4": wrap_in_future(42)}, + bam=wrap_in_future(Custom2), ) - assert instance.foo == 3 - assert instance._attrs["foo"] == 3 - - # test independence - instance._attrs["test"] = 1 - instance_ = new_cls(foo=4, bar=File("asdfl")) - assert "test" not in instance_._attrs - assert instance_.foo == 4 - assert instance.foo == 3 - - assert tuple(instance._files.keys()) == ("bar",) - assert tuple(instance._attrs.keys()) == ("foo", "baz", "test") - assert tuple(instance._serial.keys()) == ("bam", "bao", "bap") - assert type(instance._serial["bap"]) is list - assert len(instance._serial["bap"]) == 2 - assert len(instance._geoms) == 2 - assert "baq" in instance._geoms - assert "bas" in instance._geoms - - # serialization/deserialization of 'complex' Test instance - json_dump = psiflow.serialize(instance).result() - instance_ = psiflow.deserialize(json_dump, custom_cls=[new_cls, SomeSerial]) - - assert instance.foo == instance_.foo - assert instance.bar.filepath == instance_.bar.filepath - assert instance.baz == instance_.baz - assert instance.bam == instance_.bam - assert type(instance_.bao) is SomeSerial - assert len(instance_.bap) == 2 - assert type(instance_.bap[0]) is SomeSerial - assert type(instance_.bap[1]) is SomeSerial - assert id(instance) != id(instance_) - assert isinstance(instance_.baq, Geometry) - assert instance_.baq == NullState - assert instance_.bas == NullState - - # check classes created before test execution, e.g. Dataset - data = Dataset([NullState]) - assert "extxyz" in data._files - assert len(data._attrs) == 0 - assert len(data._serial) == 0 - with pytest.raises(typeguard.TypeCheckError): # try something stupid - data.extxyz = 0 - - # test getter / setter - data.extxyz = File("some_file") - assert type(data.extxyz) is File + with pytest.raises(SerializationError): # cannot do Custom3 + psiflow.serialize(instance).result() + SERIALIZABLE_CLS[Custom3.__name__] = Custom3 # can do Custom3 # test basic serialization - dumped_json = psiflow.serialize(data).result() - assert "Dataset" in dumped_json - data_dict = json.loads(dumped_json) - assert len(data_dict["Dataset"]["_attrs"]) == 0 - assert len(data_dict["Dataset"]["_serial"]) == 0 - assert len(data_dict["Dataset"]["_files"]) == 1 - assert data_dict["Dataset"]["_files"]["extxyz"] == data.extxyz.filepath + future = psiflow.serialize(instance) + json_str = future.result() + assert isinstance(json_str, str) + instance1 = psiflow.deserialize(future).result() + instance2 = resolve_nested_futures(instance).result() + assert type(instance1) == type(instance2) + assert instance1.foo == instance2.foo + assert instance1.bam == instance2.bam + file1, file2 = instance1.bar.pop(2), instance2.bar.pop(2) + assert file1.filepath == file2.filepath + assert instance1.bar == instance2.bar + file1, file2 = instance1.baz.pop("3"), instance2.baz.pop("3") + assert file1.filepath == file2.filepath + assert instance1.baz == instance2.baz # test copy_to serialization - data = Dataset([NullState]) - data.extxyz.result() - filename = Path(data.extxyz.filepath).name - assert os.path.exists(data.extxyz.filepath) - dumped_json = psiflow.serialize(data, copy_to=tmp_path / "test").result() - os.remove(data.extxyz.filepath) - assert (tmp_path / "test").exists() - assert (tmp_path / "test" / filename).exists() # new file + copy_dir = tmp_path / "dir" + psiflow.serialize(instance, copy_to=copy_dir).result() + assert copy_dir.is_dir() + assert (copy_dir / "1.log").is_file() + assert (copy_dir / "2.log").is_file()