diff --git a/configs/example_local_debug.yaml b/configs/example_local_debug.yaml new file mode 100644 index 00000000..385e0e4f --- /dev/null +++ b/configs/example_local_debug.yaml @@ -0,0 +1,15 @@ +# log all of the messages +parsl_log_level: DEBUG +psiflow_log_level: DEBUG + +# tell Parsl to not clean up completed tasks from the DFK +# you can print out the full list of tasks with 'log_dfk_tasks' from psiflow.utils.logging +garbage_collect: false + +# tell psiflow to execute every bash app in a specified directory - and not clean up afterwards +# allows you to track which files are created in each task +tmpdir_root: ~/psiflow_tasks +keep_tmpdirs: true + +# not specifying ModelEvaluation or ModelTraining falls back to the default in 'psiflow.utils.config' +# this does not include any ReferenceEvaluation blocks, so Reference apps will not run with this configuration diff --git a/configs/example_tier0_lumi.yaml b/configs/example_tier0_lumi.yaml new file mode 100644 index 00000000..13a63282 --- /dev/null +++ b/configs/example_tier0_lumi.yaml @@ -0,0 +1,21 @@ +# Tier-0 LUMI requires an active research project to submit calculations + +# run ModelEvaluation and ModelTraining in a psiflow container +container: + uri: oras://ghcr.io/molmod/psiflow:4.0.0_rocm6.2 # outdated uri + engine: singularity # LUMI uses singularity instead of apptainer + gpu_flavour: rocm # GPU nodes are of the AMD kind + +# not specifying max_runtime defaults to (just shy of) job walltime +ModelTraining: + cores_per_task: 56 + gpus_per_task: 8 + slurm: + partition: standard-g + account: project_465001125 + nodes_per_block: 1 + cores_per_node: 56 + gpus_per_node: 8 + max_blocks: 1 + walltime: "12:00:00" + scheduler_options: "#SBATCH --clusters=dodrio\n#SBATCH --gpus=1\n" diff --git a/configs/example_tier1_hortense.yaml b/configs/example_tier1_hortense.yaml new file mode 100644 index 00000000..8bfe95c0 --- /dev/null +++ b/configs/example_tier1_hortense.yaml @@ -0,0 +1,19 @@ +# Tier-1 Hortense requires an active research project to submit calculations +# always make sure to 'unset SBATCH_PARTITION' before submitting anything + +# when not using a psiflow container, you need to make sure all necessary software is available on the worker node +# this can be through installed modules, custom environments, ... +ModelEvaluation: + cores_per_task: 8 + gpus_per_task: 1 + max_runtime: 06:00:00 + slurm: + partition: gpu_rome_a100 # specify node partition where jobs should run + account: 2026_042 # specify your active computational grant + nodes_per_block: 1 + cores_per_node: 32 + gpus_per_block: 4 + max_blocks: 10 + walltime: 06:30:00 + worker_init: micromamba activate my-mace-env # load an environment with the appropriate software + diff --git a/configs/example_tier2_ugent.yaml b/configs/example_tier2_ugent.yaml new file mode 100644 index 00000000..bc87a77d --- /dev/null +++ b/configs/example_tier2_ugent.yaml @@ -0,0 +1,59 @@ +# Tier-2 UGent divides its compute nodes over 'clusters' rather than partitions +# always make sure to 'unset SLURM_CLUSTERS' before submitting anything + +# retry failed apps once +retries: 1 + +# psiflow always launches one HighThroughputExecutor and one ThreadPoolExecutor to handle internal apps (IO, parsing) +# these tasks run locally and should never do any real computational work +# default_threads needs to mainly be high enough to avoid concurrency bottlenecks +default_threads: 8 + +# run ModelEvaluation and ModelTraining in a psiflow container +container: + uri: oras://ghcr.io/molmod/psiflow:4.0.0_cu118 # outdated uri + engine: apptainer + gpu_flavour: cuda # required for GPU usage + +# tasks will use (at least) two cores (no GPUs) and can run for six hours at most +# parsl asks SLURM for 16 cores on either doduo or shinx, so 8 tasks can run per SLURM job +# only 10 SLURM jobs will run at once +ModelEvaluation: + cores_per_task: 2 + max_runtime: 06:00:00 + slurm: + nodes_per_block: 1 + cores_per_node: 16 + max_blocks: 10 + walltime: 08:00:00 + clusters: doduo,shinx + +# tasks will use 8 cores and one GPU, running for four hours at most +# parsl asks SLURM for 12 cores + 1 GPU on accelgor, so one task can run per SLURM job +ModelTraining: + cores_per_task: 12 + gpus_per_task: 1 + max_runtime: 04:00:00 + slurm: + nodes_per_block: 1 + cores_per_node: 12 + gpus_per_node: 1 + walltime: 04:00:00 + clusters: accelgor + +# tasks will use 32 cores (by default), running for one hour at most +# tasks will be killed when they exceed 64 GB of memory usage +# parsl asks SLURM for 32 cores on doduo or shinx, so one task can run per SLURM job +# tells WQ to load a CP2K module before trying to launch calculations +CP2K: + cores_per_task: 32 + max_runtime: 01:00:00 + memory_limit: 64 + slurm: + nodes_per_block: 1 + cores_per_node: 32 + mem_per_node: 64 + walltime: 04:00:00 + max_blocks: 50 + clusters: doduo,shinx + worker_init: ml CP2K/2023.1-foss-2023a diff --git a/configs/local_test.yaml b/configs/local_test.yaml index 3f9dc2bf..9381058a 100644 --- a/configs/local_test.yaml +++ b/configs/local_test.yaml @@ -1,32 +1,31 @@ ---- -parsl_log_level: WARNING -retries: 0 -make_symlinks: false +psiflow_log_level: INFO ModelEvaluation: - gpu: false - use_threadpool: false - max_simulation_time: 1 - + executor: threadpool + max_threads: 4 + max_runtime: 00:00:20 + ModelTraining: - gpu: true - use_threadpool: true - max_training_time: 1 - max_workers: 1 # suppress assertion for multigpu training - + executor: threadpool + max_threads: 4 + max_runtime: 00:00:20 + CP2K: - cores_per_worker: 1 - max_evaluation_time: 0.1 - container_uri: 'oras://ghcr.io/molmod/cp2k:2024.1' + executor: workqueue + cores_per_task: 2 + max_runtime: 00:00:20 + memory_limit: 2 + container: + uri: docker://cp2k/cp2k:2025.2_mpich_x86_64_psmp GPAW: - cores_per_worker: 1 - max_evaluation_time: 0.1 - container_uri: 'oras://ghcr.io/molmod/gpaw:24.1' + executor: workqueue + cores_per_task: 2 + max_runtime: 00:00:20 + container: + uri: oras://ghcr.io/molmod/gpaw:24.1 ORCA: - cores_per_worker: 1 - max_evaluation_time: 0.1 - + executor: workqueue + cores_per_task: 2 -... diff --git a/configs/old_hortense.yaml b/configs/old_hortense.yaml deleted file mode 100644 index d6ccc681..00000000 --- a/configs/old_hortense.yaml +++ /dev/null @@ -1,42 +0,0 @@ ---- -parsl_log_level: WARNING -container_engine: 'apptainer' -container_uri: 'oras://ghcr.io/molmod/psiflow:4.0.0_cu118' -default_threads: 8 -ModelEvaluation: - cores_per_worker: 12 - gpu: True - max_simulation_time: 20 - slurm: - partition: "gpu_rome_a100" - account: "2023_070" - nodes_per_block: 1 - cores_per_node: 48 - max_blocks: 1 - walltime: "12:00:00" - scheduler_options: "#SBATCH --clusters=dodrio\n#SBATCH --gpus=4\n" -ModelTraining: - cores_per_worker: 12 - gpu: true - max_training_time: 40 - slurm: - partition: "gpu_rome_a100" - account: "2023_070" - nodes_per_block: 1 - cores_per_node: 12 - max_blocks: 1 - walltime: "12:00:00" - scheduler_options: "#SBATCH --clusters=dodrio\n#SBATCH --gpus=1\n" -CP2K: - cores_per_worker: 64 - max_evaluation_time: 30 - launch_command: 'apptainer exec -e --no-init oras://ghcr.io/molmod/cp2k:2024.1 /opt/entry.sh mpirun -np 32 -bind-to core cp2k.psmp' - slurm: - partition: "cpu_rome" - account: "2024_079" - nodes_per_block: 1 - cores_per_node: 64 - max_blocks: 25 - walltime: "06:00:00" - scheduler_options: "#SBATCH --clusters=dodrio\n" -... diff --git a/configs/old_lumi.yaml b/configs/old_lumi.yaml deleted file mode 100644 index b5e9a14c..00000000 --- a/configs/old_lumi.yaml +++ /dev/null @@ -1,39 +0,0 @@ ---- -parsl_log_level: WARNING -container_engine: 'singularity' -container_uri: 'oras://ghcr.io/molmod/psiflow:4.0.0_rocm6.2' -default_threads: 8 -CP2K: - cores_per_worker: 32 - max_evaluation_time: 20 - launch_command: 'singularity exec -e --no-init oras://ghcr.io/molmod/cp2k:2024.1 /opt/entry.sh mpirun -np 32 cp2k.psmp' - slurm: - partition: "standard" - account: "project_465001125" - nodes_per_block: 1 - cores_per_node: 128 - max_blocks: 10 - walltime: "01:00:00" -ModelEvaluation: - cores_per_worker: 7 - gpu: True - slurm: - partition: "standard-g" - account: "project_465001125" - nodes_per_block: 1 - cores_per_node: 56 - max_blocks: 10 - walltime: "01:00:00" - scheduler_options: "#SBATCH --gres=gpu:8\n" -ModelTraining: - cores_per_worker: 7 - gpu: true - multigpu: true - slurm: - partition: "standard-g" - account: "project_465001125" - nodes_per_block: 1 - cores_per_node: 56 - walltime: "01:00:00" - scheduler_options: "#SBATCH --gres=gpu:8\n" -... diff --git a/configs/old_threadpool.yaml b/configs/old_threadpool.yaml deleted file mode 100644 index 33f6c572..00000000 --- a/configs/old_threadpool.yaml +++ /dev/null @@ -1,29 +0,0 @@ ---- -parsl_log_level: WARNING -retries: 0 -ModelEvaluation: - gpu: false - use_threadpool: true - max_simulation_time: 0.4 -ModelTraining: - gpu: true - use_threadpool: true - max_training_time: 1 - max_workers: 1 # suppress assertion for multigpu training -CP2K: - cores_per_worker: 2 - max_evaluation_time: 0.3 - launch_command: 'apptainer exec -e --no-init oras://ghcr.io/molmod/cp2k:2024.1 /opt/entry.sh mpirun -bind-to core -np 2 -env OMP_NUM_THREADS 1 cp2k.psmp' -CP2K_container: - cores_per_worker: 2 - max_evaluation_time: 0.3 - launch_command: 'apptainer exec -e --no-init oras://ghcr.io/molmod/cp2k:2024.1 /opt/entry.sh mpirun -bind-to core -np 2 -env OMP_NUM_THREADS 1 cp2k.psmp' -GPAW: - cores_per_worker: 2 - max_evaluation_time: 0.3 - launch_command: 'apptainer exec -e --no-init oras://ghcr.io/molmod/gpaw:24.1 /opt/entry.sh mpirun -np 2 gpaw python /opt/run_gpaw.py' -GPAW_container: - cores_per_worker: 2 - max_evaluation_time: 0.3 - launch_command: 'apptainer exec -e --no-init oras://ghcr.io/molmod/gpaw:24.1 /opt/entry.sh mpirun -np 2 gpaw python /opt/run_gpaw.py' -... diff --git a/configs/old_wq.yaml b/configs/old_wq.yaml deleted file mode 100644 index 660d7843..00000000 --- a/configs/old_wq.yaml +++ /dev/null @@ -1,17 +0,0 @@ ---- -parsl_log_level: WARNING -default_threads: 4 -ModelEvaluation: - cores_per_worker: 4 - gpu: True - max_simulation_time: 0.4 -ModelTraining: - cores_per_worker: 4 - gpu: true - max_training_time: 1 - max_workers: 1 -CP2K: - cores_per_worker: 2 - max_evaluation_time: 0.3 - launch_command: 'apptainer exec -e --no-init oras://ghcr.io/molmod/cp2k:2023.2 /opt/entry.sh mpirun -np 2 -x OMP_NUM_THREADS=1 cp2k.psmp' -... diff --git a/psiflow/__init__.py b/psiflow/__init__.py index e8de3e30..73f2e3fd 100644 --- a/psiflow/__init__.py +++ b/psiflow/__init__.py @@ -1,7 +1,5 @@ from pathlib import Path -import typeguard - from .config import setup_slurm_config # noqa: F401 from .execution import ExecutionContextLoader from .serialization import ( # noqa: F401 @@ -12,7 +10,6 @@ ) -@typeguard.typechecked def resolve_and_check(path: Path) -> Path: path = path.resolve() if Path.cwd() in path.parents: diff --git a/psiflow/data/dataset.py b/psiflow/data/dataset.py index a2e866e9..660928ca 100644 --- a/psiflow/data/dataset.py +++ b/psiflow/data/dataset.py @@ -9,11 +9,11 @@ from parsl.app.app import join_app, python_app from parsl.app.python import PythonApp from parsl.data_provider.files import File -from parsl.dataflow.futures import AppFuture +from parsl.dataflow.futures import AppFuture, DataFuture import psiflow from psiflow.geometry import QUANTITIES, Geometry -from psiflow.utils.apps import combine_futures, copy_data_future, unpack_i +from psiflow.utils.apps import copy_data_future, pack from .utils import ( align_axes, @@ -118,7 +118,7 @@ def __getitem__( inputs=[self.extxyz], outputs=[], # will return Geometry as Future ) - return unpack_i(future, 0) + return future[0] else: # slice, list, AppFuture extxyz = read_frames( index, @@ -127,7 +127,7 @@ def __getitem__( ).outputs[0] return Dataset(None, extxyz) - def save(self, path: Union[Path, str]) -> AppFuture: + def save(self, path: Union[Path, str]) -> DataFuture: """ Save the dataset to a file. @@ -135,13 +135,14 @@ def save(self, path: Union[Path, str]) -> AppFuture: path: Path to save the dataset. Returns: - AppFuture: Future representing the completion of the save operation. + DataFuture: Future representing the file to which will be saved. """ path = psiflow.resolve_and_check(Path(path)) - _ = copy_data_future( + future = copy_data_future( inputs=[self.extxyz], outputs=[File(str(path))], ) + return future.outputs[0] def geometries(self) -> AppFuture: """ @@ -265,9 +266,9 @@ def get( inputs=[self.extxyz], ) if len(quantities) == 1: - return unpack_i(result, 0) + return result[0] else: - return tuple([unpack_i(result, i) for i in range(len(quantities))]) + return tuple([result[i] for i in range(len(quantities))]) def evaluate( self, @@ -300,7 +301,7 @@ def evaluate( outputs = [outputs] future = insert_quantities( quantities=tuple(computable.outputs), - arrays=combine_futures(inputs=list(outputs)), + arrays=pack(*outputs), inputs=[self.extxyz], outputs=[psiflow.context().new_file("data_", ".xyz")], ) diff --git a/psiflow/data/utils.py b/psiflow/data/utils.py index 9078de13..0f549365 100644 --- a/psiflow/data/utils.py +++ b/psiflow/data/utils.py @@ -1,6 +1,6 @@ import re import shutil -from typing import Optional, Union +from typing import Optional, Union, Sequence import numpy as np import typeguard @@ -9,7 +9,6 @@ from parsl.dataflow.futures import AppFuture from psiflow.geometry import Geometry, NullState, _assign_identifier, create_outputs -from psiflow.utils.apps import unpack_i @typeguard.typechecked @@ -206,7 +205,7 @@ def _extract_quantities( @typeguard.typechecked def _insert_quantities( quantities: tuple[str, ...], - arrays: list[np.ndarray, ...], + arrays: Sequence[np.ndarray], data: Optional[list[Geometry]] = None, inputs: list = [], outputs: list = [], @@ -761,7 +760,7 @@ def get_train_valid_indices( tuple[AppFuture, AppFuture]: Futures for training and validation indices. """ future = train_valid_indices(effective_nstates, train_valid_split, shuffle) - return unpack_i(future, 0), unpack_i(future, 1) + return future[0], future[1] @typeguard.typechecked diff --git a/psiflow/execution.py b/psiflow/execution.py index c632935a..4d4aaef2 100644 --- a/psiflow/execution.py +++ b/psiflow/execution.py @@ -1,25 +1,19 @@ -from __future__ import annotations # necessary for type-guarding class methods - import logging -import math -import re import shutil import sys +import subprocess +import inspect from dataclasses import dataclass from pathlib import Path from threading import Lock - -# see https://stackoverflow.com/questions/59904631/python-class-constants-in-dataclasses -from typing import Any, Optional, Union, ClassVar, Protocol, Iterable +from typing import Any, Optional, Union, ClassVar, Protocol, Sequence, TypeVar import parsl import psutil -import pytimeparse -import typeguard import yaml from parsl.config import Config from parsl.data_provider.files import File -from parsl.executors import ( # WorkQueueExecutor, +from parsl.executors import ( HighThroughputExecutor, ThreadPoolExecutor, WorkQueueExecutor, @@ -30,478 +24,588 @@ from parsl.providers import LocalProvider, SlurmProvider from parsl.providers.base import ExecutionProvider -import psiflow +from psiflow.utils.config import ( + PSIFLOW_INTERNAL, + PARSL_LOGFILE, + PSIFLOW_LOGFILE, + DEFAULT_CONFIG, + CONTEXT_DIR, +) +from psiflow.utils.logging import setup_logging +from psiflow.utils.wq import register_definition +from psiflow.utils.apps import create_bash_template +from psiflow.utils.parse import str_to_timedelta + logger = logging.getLogger(__name__) # logging per module -PSIFLOW_INTERNAL = "psiflow_internal" +class ConfigurationError(ValueError): + pass # some global psiflow configuration option does not make sense -EXECUTION_KWARGS = ( - "container_uri", - "container_engine", - "container_addopts", - "container_entrypoint", -) + +def ensure( + *conditions: bool, + msg: str = "Whoopsie", + msgs: Sequence[str] = (), + template: str = "{}", +) -> None: + """Small helper function to replace 'assert' statements""" + if all(conditions): + return + if len(msgs) == 0: + raise ConfigurationError(msg) + msg = msgs[conditions.index(False)] + raise ConfigurationError(template.format(msg)) @dataclass class ContainerSpec: + """Controls container configuration""" + uri: str engine: str = "apptainer" addopts: str = " --no-eval -e --no-mount home -W /tmp --writable-tmpfs" - entrypoint: str = "/opt/entry.sh" + gpu_flavour: str | None = None def __post_init__(self): - assert self.engine in ("apptainer", "singularity") - assert len(self.uri) > 0 + ensure( + self.engine in ("apptainer", "singularity"), + len(self.uri) > 0, + self.gpu_flavour in ("cuda", "rocm", None), + msg="Invalid container configuration", + ) - def launch_command(self, gpu: bool = False) -> str: - # TODO: pretty sure some of this is overkill + def launch_command(self) -> str: pwd = Path.cwd().resolve() # access to data / internal dir - args = [self.engine, "exec", self.addopts, f"--bind {pwd}"] - if gpu: - if "rocm" in self.uri: - args.append("--rocm") - else: - args.append("--nv") - args += [self.uri, self.entrypoint] - return " ".join(args) - - @staticmethod - def from_kwargs(kwargs: dict) -> Optional[ContainerSpec]: - if "container_uri" not in kwargs: - return None - keys = ( - "container_uri", - "container_engine", - "container_addopts", - "container_entrypoint", - ) - args = [kwargs[key] for key in keys if key in kwargs] - return ContainerSpec(*args) # TODO: slightly hacky + gpu = {"cuda": "--nv", "rocm": "--rocm"}.get(self.gpu_flavour, "") + return f"{self.engine} run {self.addopts} {gpu} --bind {pwd} {self.uri}" + + +class ReferenceSpec(Protocol): + """Defines default options for Reference implementations""" + + name: ClassVar[str] + reference_args: ClassVar[tuple[str, ...]] + mpi_command: str + mpi_args: Sequence[str] + executable: str + + def launch_command(self) -> str: + raise NotImplementedError + + +@dataclass +class CP2KReferenceSpec(ReferenceSpec): + name = "CP2K" + reference_args = ("cores_per_task",) + mpi_command: str = "mpiexec -n {cores_per_task}" + mpi_args: Sequence[str] = ( + "-genv OMP_NUM_THREADS=1", + "--bind-to core", + "--map-by core", + ) + executable: str = "cp2k.psmp -i cp2k.inp" + + def launch_command(self): + return " ".join([self.mpi_command, *self.mpi_args, self.executable]) + + +@dataclass +class GPAWReferenceSpec(ReferenceSpec): + name = "GPAW" + reference_args = ("cores_per_task",) + mpi_command: str = "mpirun -np {cores_per_task}" + mpi_args: Sequence[str] = ( + "-x OMP_NUM_THREADS=1", + "--bind-to core", + "--map-by core", + ) + executable: str = "gpaw python script_gpaw.py input.json" + + def launch_command(self): + return " ".join([self.mpi_command, *self.mpi_args, self.executable]) + + +@dataclass +class ORCAReferenceSpec(ReferenceSpec): + name = "ORCA" + reference_args = () + mpi_command: str = "" + mpi_args: Sequence[str] = ( + "-x OMP_NUM_THREADS=1", + "--bind-to core", + "--map-by core", + ) + executable: str = "$(which orca) orca.inp" + + def launch_command(self): + mpi_str = " ".join(self.mpi_args) + return f'{self.executable} "{mpi_str}"' + + +REFERENCE_SPECS = { + "CP2K": CP2KReferenceSpec, + "GPAW": GPAWReferenceSpec, + "ORCA": ORCAReferenceSpec, +} + + +def make_slurm_provider(kwargs: dict) -> tuple[SlurmProvider, dict]: + defaults = {"init_blocks": 0, "exclusive": False} + required = ("cores_per_node", "walltime") + kwargs = defaults | kwargs + ensure(all(key in kwargs for key in required)) + provider = SlurmProvider(**kwargs) # does not configure Launcher + resources = { + "nodes": provider.nodes_per_block, + "cores": provider.cores_per_node, + "memory": provider.mem_per_node or float("inf"), + "gpus": provider.gpus_per_node or 0, + "lifetime": str_to_timedelta(provider.walltime).seconds, + } + return provider, resources + + +def make_local_provider(kwargs: dict) -> tuple[LocalProvider, dict]: + resources = { + "nodes": 1, + "cores": kwargs.get("cores", psutil.cpu_count(logical=False)), + "memory": kwargs.get( + "memory", psutil.virtual_memory().available / 1e9 + ), # TODO: available? + "lifetime": float("inf"), + } + if "gpus" in kwargs: + resources["gpus"] = kwargs["gpus"] + else: + out = "" + try: + out = subprocess.check_output( + "nvidia-smi -L || amd-smi list", + shell=True, + text=True, + stderr=subprocess.DEVNULL, + ) + except subprocess.CalledProcessError: + pass # nvidia-sm and amd-smi not found TODO: not properly tested + resources["gpus"] = out.count("\n") + provider = LocalProvider(init_blocks=0) + return provider, resources + + +def make_default_executors( + max_workers: int, path: Path, container: ContainerSpec +) -> tuple[HighThroughputExecutor, ThreadPoolExecutor]: + """Construct executors for internal app handling""" + launcher = SimpleLauncher() + if container is not None: + launcher = WrappedLauncher(prepend=container.launch_command()) + + htex = HighThroughputExecutor( + label="default_htex", + working_dir=str(path / "default_htex"), + cores_per_worker=1, + max_workers_per_node=max_workers, + cpu_affinity="none", + provider=LocalProvider(launcher=launcher, init_blocks=0), + ) + threadpool = ThreadPoolExecutor( + label="default_threads", + max_threads=max_workers, + working_dir=str(path), + ) + return htex, threadpool -@typeguard.typechecked class ExecutionDefinition: def __init__( self, - parsl_provider: ExecutionProvider, - gpu: bool, - cores_per_worker: int, - use_threadpool: bool, + provider: ExecutionProvider | None, + executor_type: str, + executor_kwargs: dict, + resources: dict, container: Optional[ContainerSpec], - max_workers: Optional[int] = None, + max_runtime: str | None = None, + env_vars: Optional[dict[str, str]] = None, **kwargs, - ) -> None: - self.parsl_provider = parsl_provider - self.gpu = gpu - self.cores_per_worker = cores_per_worker - self.use_threadpool = use_threadpool + ): + self.provider = provider + self.executor_type = executor_type + self.kwargs = executor_kwargs + self.resources = resources # compute per node self.container = container - self._max_workers = max_workers + + if self.use_gpu: + ensure( + resources["gpus"] > 0, msg="GPU usage requested but no GPUs available" + ) + ensure( + container is None or container.gpu_flavour is not None, + msg="Provide container 'gpu_flavour' to choose between CUDA and ROCM", + ) + + if self.executor_type == "workqueue": + # WQ-specific checks + ensure( + self.kwargs["gpus_per_task"] <= resources["gpus"], + self.kwargs["cores_per_task"] <= resources["cores"], + self.kwargs["mem_per_task"] <= resources["memory"], + msgs=["GPUs", "cores", "memory"], + template="Apps will request more {} than available per Parsl block", + ) + + # how long can individual tasks run (in seconds) + if max_runtime is None: + # allow some margin for task cleanup + max_runtime = max(0.9 * self.lifetime, self.lifetime - 60) + else: + max_runtime = str_to_timedelta(max_runtime).seconds + if max_runtime != float("inf") and max_runtime >= self.lifetime: + msg = "Allowed task runtime exceeds provider walltime. Tasks might get killed by the scheduler." + logger.warning(msg) + self.max_runtime = max_runtime + if ( + self.executor_type == "workqueue" + and self.kwargs["min_runtime"] >= self.max_runtime + ): + msg = "Minimum task runtime exceeds maximum runtime. WQ might not not start tasks." + logger.warning(msg) + + # set default WQ resource specs + self.spec: dict | None = None + if self.executor_type == "workqueue": + self.spec = { + "cores": self.kwargs["cores_per_task"], + "memory": int(self.kwargs["mem_per_task"] * 1000), # in MB + "gpus": self.kwargs["gpus_per_task"], + "disk": 0, # not implemented + "running_time_min": self.kwargs["min_runtime"], + } + register_definition(definition=self) + + # TODO: how to handle env variables? + # disable thread affinity and busy-idling until we can isolate task resources + default_env_vars = { + "OMP_PROC_BIND": "FALSE", + "OMP_WAIT_POLICY": "PASSIVE", + "OMP_DISPLAY_ENV": "VERBOSE", # verbose OMP log + } + if self.executor_type == "threadpool": + default_env_vars |= {"OMP_NUM_THREADS": f"{self.cores_per_task}"} + else: + # WQ sets OMP_NUM_THREADS itself + pass + + # yaml parsing might un-stringify some keys + env_vars = {k: str(v).upper() for k, v in (env_vars or {}).items()} + self.env_vars = default_env_vars | (env_vars or {}) + + return @property def name(self) -> str: return self.__class__.__name__ @property - def cores_available(self): - if type(self.parsl_provider) is LocalProvider: # noqa: F405 - cores_available = psutil.cpu_count(logical=False) - elif type(self.parsl_provider) is SlurmProvider: - cores_available = self.parsl_provider.cores_per_node - else: - cores_available = float("inf") - return cores_available + def lifetime(self) -> float: + """How long will this manager survive (in seconds)""" + return self.resources["lifetime"] @property - def max_workers(self): - if self._max_workers is not None: - return self._max_workers - else: - return max(1, math.floor(self.cores_available / self.cores_per_worker)) + def use_gpu(self) -> bool: + if self.executor_type == "threadpool": + return self.kwargs["use_gpu"] + return self.kwargs["gpus_per_task"] > 0 @property - def max_runtime(self): - if type(self.parsl_provider) is SlurmProvider: - walltime = pytimeparse.parse(self.parsl_provider.walltime) - else: - walltime = 1e9 - return walltime + def cores_per_task(self) -> int: + if self.executor_type == "workqueue": + return self.kwargs["cores_per_task"] + # assumes all threads are working + cores_per_thread = self.resources["cores"] / self.kwargs["max_threads"] + return max(int(cores_per_thread), 1) - def create_executor(self, path: Path) -> ParslExecutor: - if self.use_threadpool: - executor = ThreadPoolExecutor( - max_threads=self.cores_per_worker, - working_dir=str(path), - label=self.name, - ) - else: - cores = self.max_workers * self.cores_per_worker - worker_options = [ - "--parent-death", - "--wall-time={}".format(self.max_runtime), - "--cores={}".format(cores), - ] - if self.gpu: - worker_options.append("--gpus={}".format(self.max_workers)) - - # ensure proper scale in - if getattr(self.parsl_provider, "nodes_per_block", 1) > 1: - worker_options.append("--idle-timeout={}".format(int(1e6))) - else: - worker_options.append("--idle-timeout={}".format(20)) - - # only ModelEvaluation / ModelTraining / default_htex run in containers - if not isinstance(self, ReferenceEvaluation) and self.container: - prepend = self.container.launch_command(self.gpu) - worker_executable = f"{prepend} work_queue_worker" - else: - worker_executable = "work_queue_worker" - - executor = MyWorkQueueExecutor( - label=self.name, - working_dir=str(path / self.name), - provider=self.parsl_provider, - shared_fs=True, - autocategory=False, - port=0, - max_retries=1, - coprocess=False, - worker_options=" ".join(worker_options), - worker_executable=worker_executable, - scaling_cores_per_worker=cores, - ) + @property + def task_slots(self) -> int: + if self.executor_type == "threadpool": + return self.kwargs["max_threads"] + + gpu_slots, memory_slots = float("inf"), float("inf") + cpu_slots = self.resources["cores"] // self.cores_per_task + if self.use_gpu: + gpu_slots = self.resources["gpus"] // self.kwargs["gpus_per_task"] + if (mem_per_task := self.kwargs["mem_per_task"]) > 0: + memory_slots = self.resources["memory"] // mem_per_task + return min(cpu_slots, gpu_slots, memory_slots) + + def wrap_in_timeout(self, command: str) -> str: + if self.max_runtime == float("inf"): + return command # noop + + # send SIGTERM after max_runtime, follow with SIGKILL 30s later + return f"timeout -k 30s {self.max_runtime}s {command}" + + # def wrap_in_srun(self, command: str) -> str: + # # TODO: stub -- this does not work + # if self.provider is None: + # return command # noop + # return f"srun -t 1 -c $CORES {command}" + + def _create_threadpool(self, path: Path) -> ThreadPoolExecutor: + max_threads = self.kwargs["max_threads"] + return ThreadPoolExecutor(self.name, max_threads, working_dir=str(path)) + + def _create_workqueue(self, path: Path) -> WorkQueueExecutor: + """See https://cctools.readthedocs.io/en/latest/man_pages/work_queue_worker/#synopsis""" + + # ensure proper scale in # TODO: why is this needed? + timeout = int(1e6) if self.resources["nodes"] > 1 else 20 + cores = self.resources["cores"] + + worker_options = ["--parent-death", f"--cores={cores}", f"--timeout={timeout}"] + if (memory := self.resources["memory"]) is not None: + worker_options.append(f"--memory={memory * 1000}") # in MB + if (lifetime := self.lifetime) != float("inf"): + # allow some margin for WQ startup + walltime = max(0.95 * lifetime, lifetime - 30) + worker_options.append(f"-wall-time={walltime}") + if self.use_gpu: + gpus = self.resources["gpus"] + worker_options.append(f"--gpus={gpus}") + + worker_executable = "work_queue_worker" + if not isinstance(self, ReferenceEvaluation) and self.container: + # ModelEvaluation / ModelTraining run in container themselves + # Reference launches tasks in container + prepend = self.container.launch_command() + worker_executable = f"{prepend} {worker_executable}" + + executor = WorkQueueExecutor( + label=self.name, + working_dir=str(path / self.name), + provider=self.provider, + shared_fs=True, + port=0, # avoid multiple executors trying to use the same port + worker_options=" ".join(worker_options), + worker_executable=worker_executable, + scaling_cores_per_worker=cores, + ) return executor + def create_executor(self, path: Path) -> ParslExecutor: + if self.executor_type == "threadpool": + return self._create_threadpool(path) + return self._create_workqueue(path) + + def wq_resources(self, *args, **kwargs) -> dict: + raise NotImplementedError + @classmethod def from_config( cls, - gpu: bool = False, - cores_per_worker: int = 1, - use_threadpool: bool = False, + executor: str = "workqueue", container: Optional[ContainerSpec] = None, **kwargs, ): - # search for any section in the config which defines the Parsl ExecutionProvider - # if none are found, default to LocalProvider - # currently only checking for SLURM - if "slurm" in kwargs: - provider_cls = SlurmProvider - provider_kwargs = kwargs.pop("slurm") # do not allow empty dict - provider_kwargs["init_blocks"] = 0 - provider_kwargs.setdefault("exclusive", False) + if executor == "threadpool": + ensure(container is None, msg="Threadpool not compatible with containers") + ensure("max_threads" in kwargs, msg="Specify 'max_threads' for parallelism") + ensure( + "slurm" not in kwargs, + msg="Threadpool not compatible with remote execution", + ) + executor_kwargs = { + "max_threads": kwargs["max_threads"], + "use_gpu": kwargs.get("use_gpu", False), + } + elif executor == "workqueue": + executor_kwargs = { + "cores_per_task": kwargs.get("cores_per_task", 1), + "gpus_per_task": kwargs.get("gpus_per_task", 0), + "mem_per_task": kwargs.get("mem_per_task", 0), + } + if executor_kwargs["cores_per_task"] == 0: + raise ConfigurationError("WQ needs at least one core to launch tasks") + min_runtime = kwargs.get("min_runtime", "00:00:00") + executor_kwargs["min_runtime"] = str_to_timedelta(min_runtime).seconds else: - provider_cls = LocalProvider # noqa: F405 - provider_kwargs = kwargs.pop("local", {}) + raise ConfigurationError("Invalid executor key") - # if multi-node blocks are requested, make sure we're using SlurmProvider - if provider_kwargs.get("nodes_per_block", 1) > 1: - launcher = SlurmLauncher() + # search for Parsl ExecutionProvider block, defaulting to "local" + if "slurm" in kwargs: + # use SlurmLauncher if multi-node blocks are requested TODO: what does this fix? + provider, resources = make_slurm_provider(kwargs["slurm"]) + launcher = SlurmLauncher() if resources["nodes"] > 1 else SimpleLauncher() + provider.launcher = launcher else: - launcher = SimpleLauncher() + provider, resources = make_local_provider(kwargs.get("local", {})) + if executor == "threadpool": + provider = None # no provider needed - if container is not None: - # TODO: why not exactly? - assert not use_threadpool - - # initialize provider - parsl_provider = provider_cls( - launcher=launcher, - **provider_kwargs, - ) return cls( - parsl_provider=parsl_provider, - gpu=gpu, - use_threadpool=use_threadpool, + provider=provider, + executor_type=executor, + executor_kwargs=executor_kwargs, + resources=resources, container=container, - cores_per_worker=cores_per_worker, **kwargs, ) -@typeguard.typechecked class ModelEvaluation(ExecutionDefinition): def __init__( self, - max_simulation_time: Optional[float] = None, - timeout: float = (10 / 60), # 5 seconds - env_vars: Optional[dict[str, str]] = None, + timeout: float = 10.0, + max_resource_multiplier: int | None = None, + allow_oversubscription: bool = True, **kwargs, - ) -> None: + ): super().__init__(**kwargs) - if max_simulation_time is not None: - assert max_simulation_time * 60 < self.max_runtime - self.max_simulation_time = max_simulation_time + + if self.use_gpu and self.kwargs["gpus_per_task"] > 1: + raise ConfigurationError("No Hamiltonian can do multi-GPU evaluation") + + # i-Pi will kill client connections after no response for timeout seconds self.timeout = timeout - default_env_vars = { - "OMP_NUM_THREADS": str(self.cores_per_worker), - "KMP_AFFINITY": "granularity=fine,compact,1,0", - "KMP_BLOCKTIME": "1", - "OMP_PROC_BIND": "false", - "PYTHONUNBUFFERED": "TRUE", - } - if env_vars is None: - env_vars = default_env_vars - else: - default_env_vars.update(env_vars) - env_vars = default_env_vars - self.env_vars = env_vars - - def server_command(self): - command_list = ["psiflow-server"] - if self.max_simulation_time is not None: - max_time = 0.9 * (60 * self.max_simulation_time) - command_list = ["timeout -s 15 {}s".format(max_time), *command_list] - return " ".join(command_list) - - def client_command(self): - command_list = ["psiflow-client"] - return " ".join(command_list) - - # def get_client_args( - # self, - # hamiltonian_name: str, - # nwalkers: int, - # motion: str, - # ) -> list[str]: - # # TODO: redo this - # if "MACE" in hamiltonian_name: - # if motion in ["minimize", "vibrations"]: - # dtype = "float64" - # else: - # dtype = "float32" - # nclients = min(nwalkers, self.max_workers) - # if self.gpu: - # template = "--dtype={} --device=cuda:{}" - # args = [template.format(dtype, i) for i in range(nclients)] - # else: - # template = "--dtype={} --device=cpu" - # args = [template.format(dtype) for i in range(nclients)] - # return args - # else: - # return [""] - - def get_driver_devices(self, nwalkers: int) -> list[dict]: - # assumes driver is GPU capable - # TODO: what if only 1 gpu is available? - nclients = min(nwalkers, self.max_workers) - if self.gpu: - return [{'device': f'cuda:{i}'} for i in range(nclients)] - else: - return [{'device': 'cpu'} for _ in range(nclients)] - - def wq_resources(self, nwalkers): - if self.use_threadpool: - return {} - nclients = min(nwalkers, self.max_workers) - resource_specification = {} - resource_specification["cores"] = nclients * self.cores_per_worker - resource_specification["disk"] = 1000 # some random nontrivial amount? - memory = 2000 * self.cores_per_worker # similarly rather random - resource_specification["memory"] = int(memory) - resource_specification["running_time_min"] = self.max_simulation_time - if self.gpu: - resource_specification["gpus"] = nclients - return resource_specification - - -@typeguard.typechecked + # allow MD tasks to consume more computational resources based on walkers and hamiltonians + # but never more than available in a single resource block + if max_resource_multiplier is None: + max_resource_multiplier = self.task_slots + elif max_resource_multiplier > self.task_slots: + logger.warning( + "Provided 'max_resource_multiplier' exceeds available task slots " + f"({max_resource_multiplier} -> {self.task_slots}). " + f"Limiting 'max_resource_multiplier'." + ) + max_resource_multiplier = self.task_slots + self.max_resource_multiplier = max_resource_multiplier + + # whether i-Pi clients are allowed to share cores/GPUs + self.allow_oversubscription = allow_oversubscription + + def server_command(self) -> str: + command = "psiflow-server" + return self.wrap_in_timeout(command) + + def get_driver_resources(self, n_walkers: int, n_drivers: int) -> list[dict]: + """Divide 'expensive' drivers over available resources.""" + n_clients = n_walkers * n_drivers + m = self.max_resource_multiplier + + if n_drivers > m and not self.allow_oversubscription: + # the combination of drivers does not fit on available resources + raise ConfigurationError( + f"Simulation with {n_drivers} independent drivers not possible. " + f"Either increase 'max_resource_multiplier' or enable resource oversubscription." + ) + if n_clients > m and self.allow_oversubscription: + logger.warning( + f"Simulation wants to employ {n_clients} clients, " + f"but can only use {m}x the per-client budget. " + f"Oversubscribing CPU/GPU resources." + ) + elif n_clients > m and not self.allow_oversubscription: + # limit total numer of clients so they do not fight over resources + n_clients = m + + # TODO: what if (n_clients % n_drivers != 0) + # you will have more copies of some drivers and fewer of others.. + # TODO: what if (n_clients % m != 0) + # you will have more clients on some GPUs than others + + if not self.use_gpu: + return [{"device": "cpu"} for _ in range(n_clients)] + return [{"device": f"cuda:{_ % m}"} for _ in range(n_clients)] + + def wq_resources(self, n_clients: int) -> dict: + if self.spec is None: + return {} # threadpool + + spec = self.spec.copy() + multi = min(n_clients, self.max_resource_multiplier) + spec["cores"] *= multi + spec["gpus"] *= multi + spec["memory"] *= multi + return spec + + class ModelTraining(ExecutionDefinition): - def __init__( - self, - gpu=True, - max_training_time: Optional[float] = None, - env_vars: Optional[dict[str, str]] = None, - multigpu: bool = False, - **kwargs, - ) -> None: - super().__init__(gpu=gpu, **kwargs) - assert self.gpu - if max_training_time is not None: - assert max_training_time * 60 < self.max_runtime - self.max_training_time = max_training_time - self.multigpu = multigpu - if self.multigpu: - message = ( - "the max_training_time keyword does not work " - "in combination with multi-gpu training. Adjust " - "the maximum number of epochs to control the " - "duration of training" + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + + if not self.use_gpu: + logger.warning( + "ModelTraining is configured for CPU operation. Is this what you want?" ) - assert self.max_training_time is None, message - default_env_vars = { - "OMP_NUM_THREADS": str(self.cores_per_worker), - "KMP_AFFINITY": "granularity=fine,compact,1,0", - "KMP_BLOCKTIME": "1", - "OMP_PROC_BIND": "spread", # different from Model Eval - "PYTHONUNBUFFERED": "TRUE", - } - if env_vars is None: - env_vars = default_env_vars - else: - default_env_vars.update(env_vars) - env_vars = default_env_vars - self.env_vars = env_vars + # if self.multigpu: + # # TODO: why? Think this might be a multinode thing - which I do not care about + # message = ( + # "the max_training_time keyword does not work " + # "in combination with multi-gpu training. Adjust " + # "the maximum number of epochs to control the " + # "duration of training" + # ) + # assert self.max_runtime is None, message def train_command(self, initialize: bool = False): - # script = "$(python -c 'import psiflow.models.mace_utils; print(psiflow.models.mace_utils.__file__)')" - command_list = ["psiflow-mace-train"] - if (self.max_training_time is not None) and not initialize: - max_time = 0.9 * (60 * self.max_training_time) - command_list = ["timeout -s 15 {}s".format(max_time), *command_list] - return " ".join(command_list) - - def wq_resources(self): - if self.use_threadpool: - return {} - resource_specification = {} - - if self.multigpu: - nworkers = int(self.cores_available / self.cores_per_worker) - else: - nworkers = 1 + command = "psiflow-mace-train" + return self.wrap_in_timeout(command) - resource_specification["gpus"] = nworkers # one per GPU - resource_specification["cores"] = self.cores_available - resource_specification["disk"] = ( - 1000 * nworkers - ) # some random nontrivial amount? - memory = 1000 * self.cores_available # similarly rather random - resource_specification["memory"] = int(memory) - resource_specification["running_time_min"] = self.max_training_time - return resource_specification + def wq_resources(self, *args, **kwargs) -> dict: + if self.spec is None: + return {} # threadpool + return self.spec.copy() -@typeguard.typechecked class ReferenceEvaluation(ExecutionDefinition): def __init__( self, - spec: ReferenceSpec, - max_evaluation_time: Optional[float] = None, - memory_limit: Optional[str] = None, + reference: ReferenceSpec, + memory_limit: Optional[float] = None, **kwargs, ) -> None: - # TODO: how to know which code? super().__init__(**kwargs) - self.spec = spec - self.max_evaluation_time = max_evaluation_time * 60 # seconds - if max_evaluation_time: - assert 0 < max_evaluation_time < self.max_runtime - self.memory_limit = memory_limit + self.reference = reference + self.memory_limit = memory_limit # in GB + + if self.use_gpu: + logger.warning("Reference calculations do not support GPU computation yet.") def command(self): - launch_command = self.spec.launch_command() - kwargs = {k: getattr(self, k) for k in self.spec.reference_args} - launch_command = launch_command.format(**kwargs) + command = self.reference.launch_command() + kwargs = {k: getattr(self, k) for k in self.reference.reference_args} + command = command.format(**kwargs) if self.container is not None: - launch_command = f"{self.container.launch_command()} {launch_command}" - - if (max_time := self.max_evaluation_time) is None: - # leave some slack for startup and cleanup - max_time = max(0.9 * self.max_runtime, self.max_runtime - 5) - launch_command = f"timeout -s 9 {max_time}s {launch_command}" - - commands = [] - if self.memory_limit is not None: - # based on https://stackoverflow.com/a/42865957/2002471 - units = {"KB": 1, "MB": 2**10, "GB": 2**20, "TB": 2**30} - - def parse_size(size): # TODO: to utils? - size = size.upper() - if not re.match(r" ", size): - size = re.sub(r"([KMGT]?B)", r" \1", size) - number, unit = [string.strip() for string in size.split()] - return int(float(number) * units[unit]) - - commands.append(f"ulimit -v {parse_size(self.memory_limit)}") - - # exit code 0 so parsl always thinks bash app succeeded - return "\n".join([*commands, launch_command, "exit 0"]) - - def wq_resources(self): - if self.use_threadpool: - return {} - resource_specification = {} - resource_specification["cores"] = self.cores_per_worker - resource_specification["disk"] = 1000 # some random nontrivial amount? - memory = 2000 * self.cores_per_worker # similarly rather random - resource_specification["memory"] = int(memory) - resource_specification["running_time_min"] = self.max_evaluation_time - return resource_specification - - @property - def name(self) -> str: - return self.spec.name + command = f"{self.container.launch_command()} {command}" + if (mem := self.memory_limit) is not None: + # set max RAM usage and disable swap storage - requires systemd-run + command = f"systemd-run --user --scope -p MemoryMax={mem}G -p MemorySwapMax=0 {command}" + return self.wrap_in_timeout(command) -class ReferenceSpec(Protocol): - name: ClassVar[str] - reference_args: ClassVar[tuple[str, ...]] - mpi_command: str - mpi_args: Iterable[str] - executable: str - - def launch_command(self) -> str: - raise NotImplementedError + def wq_resources(self, n_cores: int | None) -> dict: + if self.spec is None: + return {} # threadpool + fraction = 1 + if n_cores is not None: + fraction = n_cores / self.kwargs["cores_per_task"] + spec = self.spec.copy() + spec["cores"] = int(spec["cores"] * fraction) + spec["memory"] *= fraction + return spec -@dataclass -class CP2KReferenceSpec(ReferenceSpec): - name = "CP2K" - reference_args = ("cores_per_worker",) - mpi_command: str = "mpirun -np {cores_per_worker}" - mpi_args: tuple[str, ...] = ( - "-ENV OMP_NUM_THREADS=1", - "--bind-to core", - "--map-by core", - ) - executable: str = "cp2k.psmp -i cp2k.inp" - - def launch_command(self): - # use nprocs = ncores, nthreads = 1 - return " ".join([self.mpi_command, *self.mpi_args, self.executable]) - - -@dataclass -class GPAWReferenceSpec(ReferenceSpec): - name = "GPAW" - reference_args = ("cores_per_worker",) - mpi_command: str = "mpirun -np {cores_per_worker}" - mpi_args: tuple[str, ...] = ( - "-x OMP_NUM_THREADS=1", - "--bind-to core", - "--map-by core", - ) - executable: str = "gpaw python script_gpaw.py input.json" - - def launch_command(self): - # use nprocs = ncores, nthreads = 1 - return " ".join([self.mpi_command, *self.mpi_args, self.executable]) - - -@dataclass -class ORCAReferenceSpec(ReferenceSpec): - name = "ORCA" - reference_args = () - mpi_command: str = "" - mpi_args: tuple[str, ...] = ( - "-x OMP_NUM_THREADS=1", - "--bind-to core", - "--map-by core", - ) - executable: str = "$(which orca) orca.inp" - - def launch_command(self): - mpi_str = " ".join(self.mpi_args) - return f'{self.executable} "{mpi_str}"' + @property + def name(self) -> str: + if not hasattr(self, "reference"): + return super().name # during init + return self.reference.name -@typeguard.typechecked class ExecutionContext: """ Psiflow centralizes all execution-level configuration options using an ExecutionContext. @@ -513,7 +617,6 @@ class ExecutionContext: and QM evaluation apps. As such, we ensure that execution-side details are strictly separated from the definition of the computational graph itself. For more information, check out the psiflow documentation regarding execution. - """ def __init__( @@ -521,12 +624,20 @@ def __init__( config: Config, definitions: list[ExecutionDefinition], path: Union[Path, str], + tmpdir_root: str, + keep_tmpdirs: bool, + **kwargs, ) -> None: self.config = config self.path = Path(path).resolve() self.path.mkdir(parents=True, exist_ok=True) + self.definitions = {d.name: d for d in definitions} - assert len(self.definitions) == len(definitions) + ensure(len(self.definitions) == len(definitions)) + + # make sure task tmpdirs can be made + Path(tmpdir_root).mkdir(parents=True, exist_ok=True) + self.bash_template = create_bash_template(tmpdir_root, keep_tmpdirs) self.file_index = {} self.lock = Lock() parsl.load(config) @@ -540,128 +651,79 @@ def __exit__(self, exc_type, exc_value, traceback): parsl.dfk().cleanup() def new_file(self, prefix: str, suffix: str) -> File: + assert prefix[-1] == "_" + assert suffix[0] == "." + padding = 6 with self.lock: - assert prefix[-1] == "_" - assert suffix[0] == "." key = (prefix, suffix) if key not in self.file_index.keys(): self.file_index[key] = 0 - padding = 6 assert self.file_index[key] < (16**padding) - identifier = "{0:0{1}x}".format(self.file_index[key], padding) + identifier = f"{self.file_index[key]:0{padding}x}" self.file_index[key] += 1 return File(str(self.path / (prefix + identifier + suffix))) @classmethod def from_config( cls, - parsl_log_level: str = "WARNING", - usage_tracking: int = 3, - retries: int = 2, - strategy: str = "simple", - max_idletime: float = 20, - internal_tasks_max_threads: int = 10, - default_threads: int = 4, - htex_address: str = "127.0.0.1", - zip_staging: Optional[bool] = None, - make_symlinks: bool = False, + parsl_log_level: str, + psiflow_log_level: str, + default_threads: int, **kwargs, - ) -> ExecutionContext: + ) -> "ExecutionContext": path = Path.cwd().resolve() / PSIFLOW_INTERNAL - psiflow.resolve_and_check(path) if path.exists(): shutil.rmtree(path) - path.mkdir(parents=True, exist_ok=True) - parsl.set_file_logger( - filename=str(path / "parsl.log"), - name="parsl", - level=getattr(logging, parsl_log_level), - ) + path.mkdir(parents=True) + patch_parsl_dirtree() + + # setup logging + log_file = str(path / PARSL_LOGFILE) + log_level = getattr(logging, parsl_log_level) + parsl.set_file_logger(filename=log_file, name="parsl", level=log_level) + setup_logging(file=path / PSIFLOW_LOGFILE, level=psiflow_log_level) + + # default container for ModelEvaluation and ModelTraining + base_container = None + if "container" in kwargs: + base_container = make_cls(ContainerSpec, **kwargs["container"]) # create definitions - base_container = ContainerSpec.from_kwargs(kwargs) - kwargs.pop("container_uri", None) model_evaluation = ModelEvaluation.from_config( - container=base_container, - **kwargs.pop("ModelEvaluation", {}), + container=base_container, **kwargs["ModelEvaluation"] ) model_training = ModelTraining.from_config( - container=base_container, - **kwargs.pop("ModelTraining", {"gpu": True}), # avoid triggering assertion + container=base_container, **kwargs["ModelTraining"] ) - reference_evaluations = [] # reference evaluations might be class specific - for key in list(kwargs.keys()): - if key[:4] in REFERENCE_SPECS: # allow for e.g., CP2K_small - config = kwargs.pop(key) + reference_evaluations = [] # reference evaluations are class specific + for key, reference_cls in REFERENCE_SPECS.items(): + if key in kwargs: + config = kwargs[key] + container = None + if "container" in config: + container = make_cls(ContainerSpec, **config.pop("container")) + reference = make_cls(reference_cls, **config) reference_evaluation = ReferenceEvaluation.from_config( - spec=init_spec(REFERENCE_SPECS[key[:4]], config), - container=ContainerSpec.from_kwargs(kwargs | config), - **config, + reference=reference, + container=container, + **config, # make sure the container key is removed ) reference_evaluations.append(reference_evaluation) definitions = [model_evaluation, model_training, *reference_evaluations] # create main parsl executors executors = [d.create_executor(path=path) for d in definitions] + internal = make_default_executors(default_threads, path, base_container) + executors.extend(internal) - # create default executors - if base_container is not None: - launcher = WrappedLauncher(prepend=base_container.launch_command()) - else: - launcher = SimpleLauncher() - htex = HighThroughputExecutor( - label="default_htex", - address=htex_address, - working_dir=str(path / "default_htex"), - cores_per_worker=1, - max_workers_per_node=default_threads, - cpu_affinity="none", - provider=LocalProvider(launcher=launcher, init_blocks=0), # noqa: F405 - ) - threadpool = ThreadPoolExecutor( - label="default_threads", - max_threads=default_threads, - working_dir=str(path), - ) - executors.extend([htex, threadpool]) - - # remove additional kwargs - # if zip_staging: - - # def zip_uri(base, task_record, err_or_out): - # zip_path = base / "base.zip" - # file = f"{task_record['func_name']}.{task_record['id']}.{task_record['try_id']}.{err_or_out}" - # return File(f"zip:{zip_path}/{file}") - - # std_autopath = partial(zip_uri, path) - # else: - # std_autopath = None - config = Config( + config = make_cls( + Config, executors=executors, run_dir=str(path), initialize_logging=False, - app_cache=False, - usage_tracking=usage_tracking, - retries=retries, - strategy=strategy, - max_idletime=max_idletime, - internal_tasks_max_threads=internal_tasks_max_threads, - # std_autopath=std_autopath, + **kwargs, ) - context = ExecutionContext(config, definitions, path / "context_dir") - - if make_symlinks: - src, dest = Path.cwd() / "psiflow_log", path / "parsl.log" - _create_symlink(src, dest) - src, dest = ( - Path.cwd() / "psiflow_submit_scripts", - path / "000" / "submit_scripts", - ) - _create_symlink(src, dest, is_dir=True) - src, dest = Path.cwd() / "psiflow_task_logs", path / "000" / "task_logs" - _create_symlink(src, dest, is_dir=True) - - return context + return ExecutionContext(config, definitions, path / CONTEXT_DIR, **kwargs) class ExecutionContextLoader: @@ -670,33 +732,28 @@ class ExecutionContextLoader: @classmethod def load( cls, - psiflow_config: Optional[dict[str, Any]] = None, + config: Optional[dict[str, Any]] = None, ) -> ExecutionContext: if cls._context is not None: raise RuntimeError("ExecutionContext has already been loaded") - if psiflow_config is None: # assume yaml is passed as argument - if len(sys.argv) == 1: # no config passed, use threadpools: - psiflow_config = { - "ModelEvaluation": { - "gpu": False, - "use_threadpool": True, - }, - "ModelTraining": { - "gpu": True, - "use_threadpool": True, - }, - } - else: - assert len(sys.argv) == 2 - path_config = psiflow.resolve_and_check(Path(sys.argv[1])) - assert path_config.exists() - assert path_config.suffix in [".yaml", ".yml"], ( - "the execution configuration needs to be specified" - " as a YAML file, but got {}".format(path_config) - ) - with open(path_config, "r") as f: - psiflow_config = yaml.safe_load(f) - cls._context = ExecutionContext.from_config(**psiflow_config) + if config is not None: + pass + elif len(sys.argv) == 1: + config = {} + else: + assert len(sys.argv) <= 2 # only accept a single argument + path_config = Path(sys.argv[1]) + assert path_config.exists() + assert path_config.suffix in [".yaml", ".yml"], ( + f"the execution configuration needs to be specified" + f" as a YAML file, but got {path_config}" + ) + with open(path_config, "r") as f: + config = yaml.safe_load(f) + + # set the context so it can be retrieved later + config = yaml.safe_load(DEFAULT_CONFIG) | config + cls._context = ExecutionContext.from_config(**config) return cls._context @classmethod @@ -711,6 +768,7 @@ def wait(cls): class SlurmLauncher(Launcher): + # TODO: what does this do? def __init__(self, debug: bool = True, overrides: str = ""): super().__init__(debug=debug) self.overrides = overrides @@ -745,30 +803,23 @@ def __call__(self, command: str, tasks_per_node: int, nodes_per_block: int) -> s return x -class MyWorkQueueExecutor(WorkQueueExecutor): - def _get_launch_command(self, block_id): - return self.worker_command +def patch_parsl_dirtree() -> None: + """By default, Parsl will put Executor logs etc. under numbered directories. + We do not need this level of nesting, as psiflow_internal is refreshed every run""" + import parsl.dataflow.dflow + # replace with noop, which needs to happen after parsl.dataflow.dflow initialises + parsl.dataflow.dflow.make_rundir = lambda x: x -def _create_symlink(src: Path, dest: Path, is_dir: bool = False) -> None: - """Create or replace symbolic link""" - if src.is_symlink(): - src.unlink() - if is_dir: - dest.mkdir(parents=True, exist_ok=True) - else: - dest.touch(exist_ok=True) - src.symlink_to(dest, target_is_directory=is_dir) - -REFERENCE_SPECS = { - "CP2K": CP2KReferenceSpec, - "GPAW": GPAWReferenceSpec, - "ORCA": ORCAReferenceSpec, -} +# TODO: after 3.12, this is no longer needed +# https://docs.python.org/3/library/typing.html +T = TypeVar("T") -def init_spec(spec_cls: type(ReferenceSpec), kwargs: dict) -> ReferenceSpec: - keys = ("mpi_command", "mpi_args", "executable") - cls_kwargs = {k: kwargs[k] for k in keys if k in kwargs} - return spec_cls(**cls_kwargs) +def make_cls(cls: type[T], **kwargs: Any) -> T: + """Very simple class factory. Use introspection to filter args and kwargs.""" + sign = inspect.signature(cls) + argument_names = list(sign.parameters.keys()) + arguments = {k: kwargs[k] for k in argument_names if k in kwargs} + return cls(**arguments) diff --git a/psiflow/free_energy/phonons.py b/psiflow/free_energy/phonons.py index c93dd47d..8728fd59 100644 --- a/psiflow/free_energy/phonons.py +++ b/psiflow/free_energy/phonons.py @@ -17,10 +17,10 @@ make_driver_commands, make_wait_for_sockets_command, ) -from psiflow.sampling.optimize import setup_forces, export_env_command +from psiflow.sampling.optimize import setup_forces from psiflow.utils.apps import multiply from psiflow.utils.io import load_numpy, save_xml -from psiflow.utils import TMP_COMMAND, CD_COMMAND +from psiflow.utils.parse import format_env_vars def _compute_frequencies(hessian: np.ndarray, geometry: Geometry) -> np.ndarray: @@ -88,6 +88,7 @@ def _execute_ipi( driver_kwargs: list[dict], command_server: str, env_vars: dict = {}, + bash_template: str = "", stdout: str = parsl.AUTO_LOGNAME, stderr: str = parsl.AUTO_LOGNAME, inputs: list = [], @@ -102,18 +103,15 @@ def _execute_ipi( set(d["address"] for d in driver_kwargs) ) commands_driver = make_driver_commands(driver_kwargs, file_xyz_in, files_in) - command_list = [ - TMP_COMMAND, - CD_COMMAND, - export_env_command(env_vars), command_start, command_wait, *commands_driver, "wait", f"cp i-pi.output_full.hess {outputs[0]}", ] - return "\n".join(command_list) + commands, env = "\n".join(command_list), format_env_vars(env_vars) + return bash_template.format(commands=commands, env=env) execute_ipi = bash_app(_execute_ipi, executors=["ModelEvaluation"]) @@ -164,13 +162,14 @@ def compute_harmonic( inputs.append(comp.hamiltonian.serialize_function(dtype="float64")) kwargs = {"idx": i, "address": comp.address} if isinstance(comp.hamiltonian, MACEHamiltonian): - kwargs |= definition.get_driver_devices(1)[0] + kwargs |= definition.get_driver_resources(1, 1)[0] driver_kwargs.append(kwargs) result = execute_ipi( driver_kwargs, definition.server_command(), env_vars=definition.env_vars, + bash_template=context.bash_template, inputs=inputs, outputs=[context.new_file("hess_", ".txt")], parsl_resource_specification=definition.wq_resources(1), diff --git a/psiflow/functions.py b/psiflow/functions.py index e12bec91..8689616e 100644 --- a/psiflow/functions.py +++ b/psiflow/functions.py @@ -296,6 +296,7 @@ class DispersionFunction(EnergyFunction): def __post_init__(self): # OMP_NUM_THREADS for parallel evaluation does not work.. # https://github.com/dftd3/simple-dftd3/issues/49 + # TODO: check whether this is still the case os.environ["OMP_NUM_THREADS"] = str(self.num_threads * 10) from dftd3.ase import DFTD3 diff --git a/psiflow/hamiltonians.py b/psiflow/hamiltonians.py index a065484d..59ca43eb 100644 --- a/psiflow/hamiltonians.py +++ b/psiflow/hamiltonians.py @@ -444,9 +444,9 @@ def parameters(self) -> dict: return { "model_path": model_path, "atomic_energies": self.atomic_energies, - "ncores": evaluation.cores_per_worker, + "ncores": evaluation.cores_per_task, "dtype": "float32", - "device": "gpu" if evaluation.gpu else "cpu", + "device": "gpu" if evaluation.use_gpu else "cpu", "env_vars": evaluation.env_vars, } diff --git a/psiflow/learning.py b/psiflow/learning.py index 3191a14c..eab93d18 100644 --- a/psiflow/learning.py +++ b/psiflow/learning.py @@ -1,6 +1,7 @@ from __future__ import annotations # necessary for type-guarding class methods import shutil +import logging from pathlib import Path from typing import Optional, Union @@ -17,9 +18,10 @@ from psiflow.models import Model from psiflow.reference import Reference from psiflow.sampling import SimulationOutput, Walker, sample -from psiflow.utils.apps import boolean_or, isnan, setup_logger, unpack_i +from psiflow.utils.apps import boolean_or, isnan -logger = setup_logger(__name__) + +logger = logging.getLogger(__name__) # logging per module @typeguard.typechecked diff --git a/psiflow/metrics.py b/psiflow/metrics.py index 2f733862..0234642b 100644 --- a/psiflow/metrics.py +++ b/psiflow/metrics.py @@ -1,6 +1,7 @@ from __future__ import annotations # necessary for type-guarding class methods import os +import logging from pathlib import Path from typing import Optional, Union @@ -15,9 +16,10 @@ from psiflow.hamiltonians import Hamiltonian from psiflow.models import Model from psiflow.sampling import SimulationOutput -from psiflow.utils.apps import combine_futures, log_message, setup_logger +from psiflow.utils.apps import log_message +# from psiflow.utils.apps import combine_futures, log_message, setup_logger -logger = setup_logger(__name__) +logger = logging.getLogger(__name__) # logging per module @typeguard.typechecked diff --git a/psiflow/models/_mace.py b/psiflow/models/_mace.py index 5ed67724..46fc04d9 100644 --- a/psiflow/models/_mace.py +++ b/psiflow/models/_mace.py @@ -227,7 +227,7 @@ def _create_apps(self): # initialize apps app_initialize = bash_app(initialize, executors=[evaluation.name]) resources_init = evaluation.wq_resources(1) # TODO: find a better way for model init - if not evaluation.use_threadpool: + if not evaluation.executor_type == "threadpool": resources_init["running_time_min"] = 30 # at least 30 mins for init? app_train = bash_app(train, executors=[training.name]) resources_train = training.wq_resources() diff --git a/psiflow/models/model.py b/psiflow/models/model.py index 6e11b04a..73c9cb1a 100644 --- a/psiflow/models/model.py +++ b/psiflow/models/model.py @@ -1,5 +1,6 @@ from __future__ import annotations # necessary for type-guarding class methods +import logging from dataclasses import asdict from pathlib import Path from typing import Optional, Union @@ -11,10 +12,11 @@ import psiflow from psiflow.data import Dataset -from psiflow.utils.apps import copy_data_future, log_message, setup_logger +from psiflow.utils.apps import copy_data_future, log_message from psiflow.utils.io import save_yaml -logger = setup_logger(__name__) + +logger = logging.getLogger(__name__) @typeguard.typechecked diff --git a/psiflow/order_parameters.py b/psiflow/order_parameters.py deleted file mode 100644 index 204de2ea..00000000 --- a/psiflow/order_parameters.py +++ /dev/null @@ -1,2 +0,0 @@ -class OrderParameter: - pass diff --git a/psiflow/reference/cp2k_.py b/psiflow/reference/cp2k_.py index 6af60952..4d707f84 100644 --- a/psiflow/reference/cp2k_.py +++ b/psiflow/reference/cp2k_.py @@ -1,7 +1,7 @@ import copy import io import warnings -from typing import Optional, Union +from typing import Optional, Union, ClassVar import numpy as np from ase.data import chemical_symbols @@ -15,8 +15,6 @@ from psiflow.geometry import Geometry from psiflow.reference.reference import Reference, Status, get_spin_multiplicities from psiflow.utils.parse import find_line, lines_to_array -from psiflow.utils import TMP_COMMAND, CD_COMMAND - # costly to initialise input_parser = CP2KInputParserSimplified( @@ -58,15 +56,16 @@ def modify_input(input_dict: dict, properties: tuple) -> None: def parse_output(output_str: str, properties: tuple) -> dict[str, float | np.ndarray]: + """Very basic output parser. Perhaps check the cp2k-output-tools package?""" lines = output_str.split("\n") data = {} # output status - idx = find_line(lines, "CP2K", reverse=True, max_lines=250) + key = "SUBROUTINE" + idx = find_line(lines, key, reverse=True, max_lines=100) data["status"] = status = Status.SUCCESS if idx is not None else Status.FAILED if status == Status.SUCCESS: - # total runtime - data["runtime"] = float(lines[idx].split()[-1]) + data["runtime"] = float(lines[idx + 2].split()[-1]) # total runtime # find number of atoms idx = find_line(lines, "TOTAL NUMBERS AND MAXIMUM NUMBERS") @@ -79,7 +78,7 @@ def parse_output(output_str: str, properties: tuple) -> dict[str, float | np.nda data["positions"] = lines_to_array(lines[idx : idx + natoms], 4, 7) # read energy - key = "ENERGY| Total FORCE_EVAL ( QS ) energy [a.u.]" + key = "ENERGY| Total FORCE_EVAL ( QS ) energy" idx = find_line(lines, key, idx) data["energy"] = float(lines[idx].split()[-1]) * Ha @@ -87,28 +86,23 @@ def parse_output(output_str: str, properties: tuple) -> dict[str, float | np.nda return data # read forces - key = "ATOMIC FORCES in [a.u.]" - idx = find_line(lines, key, idx) + 3 - forces = lines_to_array(lines[idx : idx + natoms], 3) + key = "FORCES| Atomic forces" + idx = find_line(lines, key, idx) + 2 + forces = lines_to_array(lines[idx : idx + natoms], 2, 5) return data | {"forces": forces * Ha / Bohr} @psiflow.serializable class CP2K(Reference): + executor: ClassVar[str] = "CP2K" _execute_label = "cp2k_singlepoint" input_dict: dict - def __init__( - self, - input_str: str, - executor: str = "CP2K", - outputs: Union[tuple, list] = ("energy", "forces"), - ): - self.executor = executor - self.outputs = tuple(outputs) + def __init__(self, input_str: str, **kwargs): + super().__init__(**kwargs) self.input_dict = str_to_dict(input_str) - modify_input(self.input_dict, outputs) + modify_input(self.input_dict, self.outputs) self._create_apps() def compute_atomic_energy(self, element, box_size=None) -> AppFuture[float]: @@ -141,12 +135,7 @@ def get_single_atom_references(self, element: str) -> dict[int, Reference]: return references def get_shell_command(self, inputs: list[File]) -> str: - command_list = [ - TMP_COMMAND, - CD_COMMAND, - f"cp {inputs[0].filepath} cp2k.inp", - self.execute_command, - ] + command_list = [f"cp {inputs[0].filepath} cp2k.inp", self.execute_command] return "\n".join(command_list) def parse_output(self, stdout: str) -> dict: diff --git a/psiflow/reference/dummy.py b/psiflow/reference/dummy.py index f947efbf..eeb2baab 100644 --- a/psiflow/reference/dummy.py +++ b/psiflow/reference/dummy.py @@ -12,16 +12,16 @@ @psiflow.serializable class ReferenceDummy(Reference): + executor = "HTEX" _execute_label = "dummy_singlepoint" - def __init__(self, outputs: Union[tuple, list] = ("energy", "forces")): - self.outputs = outputs + def __init__(self, **kwargs): + super().__init__(**kwargs) self._create_apps() def _create_apps(self): # psiflow.context().definitions does not contain "default_htex" self.execute_command = "" - self.app_pre = self.create_input self.app_execute = partial( bash_app(_execute, executors=["default_htex"]), reference=self, diff --git a/psiflow/reference/gpaw_.py b/psiflow/reference/gpaw_.py index 5c6d5bdd..e787abbc 100644 --- a/psiflow/reference/gpaw_.py +++ b/psiflow/reference/gpaw_.py @@ -1,6 +1,6 @@ import json from pathlib import Path -from typing import Optional, Union +from typing import Optional, Union, ClassVar from parsl import File from parsl.dataflow.futures import AppFuture @@ -8,7 +8,6 @@ import psiflow from psiflow.geometry import Geometry from psiflow.reference.reference import Reference, Status -from psiflow.utils import TMP_COMMAND, CD_COMMAND from psiflow.utils.apps import copy_app_future from psiflow.utils.parse import find_line from psiflow.reference._gpaw import FILEPATH, DEFAULTS, STDOUT_KEY @@ -35,22 +34,16 @@ def parse_output(stdout: str, properties: tuple[str, ...]) -> dict: @psiflow.serializable class GPAW(Reference): + executor: ClassVar[str] = "GPAW" _execute_label = "gpaw_singlepoint" parameters: dict script: str - def __init__( - self, - parameters: dict, - script: str | Path = FILEPATH, - outputs: Union[tuple, list] = ("energy", "forces"), - executor: str = "GPAW", - ): - self.outputs = tuple(outputs) + def __init__(self, parameters: dict, script: str | Path = FILEPATH, **kwargs): + super().__init__(**kwargs) self.parameters = parameters assert (script := Path(script)).is_file() self.script = str(script.resolve()) # absolute path - self.executor = executor self._create_apps() def compute_atomic_energy(self, element, box_size=None) -> AppFuture: @@ -58,8 +51,6 @@ def compute_atomic_energy(self, element, box_size=None) -> AppFuture: def get_shell_command(self, inputs: list[File]) -> str: command_list = [ - TMP_COMMAND, - CD_COMMAND, f"cp {inputs[0].filepath} input.json", f"cp {self.script} script_gpaw.py", self.execute_command, diff --git a/psiflow/reference/orca_.py b/psiflow/reference/orca_.py index 05cd2d1c..cc2185bc 100644 --- a/psiflow/reference/orca_.py +++ b/psiflow/reference/orca_.py @@ -1,7 +1,7 @@ import warnings import re from functools import partial -from typing import Optional, Union +from typing import Optional, Union, ClassVar import ase.symbols import numpy as np @@ -12,8 +12,7 @@ import psiflow from psiflow.geometry import Geometry from psiflow.reference.reference import Reference, Status, get_spin_multiplicities -from psiflow.utils import TMP_COMMAND, CD_COMMAND -from psiflow.utils.parse import find_line, lines_to_array, string_to_timedelta +from psiflow.utils.parse import find_line, lines_to_array, str_to_timedelta KEY_GHOST = "ghost" @@ -95,7 +94,7 @@ def parse_output(stdout: str, properties: tuple[str, ...]) -> dict: if status == Status.SUCCESS: # total runtime idx = find_line(lines, "TOTAL RUN TIME", reverse=True, max_lines=5) - data["runtime"] = string_to_timedelta(lines[idx][16:]) + data["runtime"] = str_to_timedelta(lines[idx][16:]) # read coordinates idx_start = idx = find_line(lines, "CARTESIAN COORDINATES (ANGSTROEM)") + 2 @@ -122,20 +121,15 @@ def parse_output(stdout: str, properties: tuple[str, ...]) -> dict: @psiflow.serializable class ORCA(Reference): + executor: ClassVar[str] = "ORCA" _execute_label = "orca_singlepoint" input_template: str input_kwargs: dict - def __init__( - self, - input_template: str, - executor: str = "ORCA", - outputs: Union[tuple, list] = ("energy", "forces"), - ): - self.executor = executor - self.input_template = check_input(input_template, outputs) + def __init__(self, input_template: str, **kwargs): + super().__init__(**kwargs) + self.input_template = check_input(input_template, self.outputs) self.input_kwargs = DEFAULT_KWARGS.copy() # TODO: user control? - self.outputs = tuple(outputs) self._create_apps() def _create_apps(self): @@ -162,12 +156,7 @@ def get_single_atom_references(self, element: str) -> dict[int, Reference]: return references def get_shell_command(self, inputs: list[File]) -> str: - command_list = [ - TMP_COMMAND, - CD_COMMAND, - f"cp {inputs[0].filepath} orca.inp", - self.execute_command, - ] + command_list = [f"cp {inputs[0].filepath} orca.inp", self.execute_command] return "\n".join(command_list) def parse_output(self, stdout: str) -> dict: diff --git a/psiflow/reference/reference.py b/psiflow/reference/reference.py index 96ff842e..d049d9a9 100644 --- a/psiflow/reference/reference.py +++ b/psiflow/reference/reference.py @@ -1,6 +1,7 @@ from __future__ import annotations # necessary for type-guarding class methods import warnings +import logging from typing import ClassVar, Optional, Union, Callable, Sequence from pathlib import Path from functools import partial @@ -16,11 +17,11 @@ from psiflow.data import Computable, Dataset from psiflow.data.utils import extract_quantities from psiflow.geometry import Geometry, NullState -from psiflow.utils.apps import copy_app_future, setup_logger +from psiflow.utils.apps import copy_app_future from psiflow.utils.parse import LineNotFoundError, get_task_name_id -logger = setup_logger(__name__) # logging per module +logger = logging.getLogger(__name__) # logging per module class Status(Enum): @@ -93,12 +94,15 @@ def compute_dataset( def _execute( reference: Reference, inputs: list[File], + bash_template: str, parsl_resource_specification: Optional[dict] = None, stdout: str = parsl.AUTO_LOGNAME, stderr: str = parsl.AUTO_LOGNAME, label: str = "singlepoint", ) -> str: - return reference.get_shell_command(inputs) + # TODO: we do not set env_vars here? + command = reference.get_shell_command(inputs) + return bash_template.format(commands=command, env=">/dev/null") def _process_output( @@ -111,7 +115,7 @@ def _process_output( try: data = reference.parse_output(stdout) except LineNotFoundError: - # TODO: find out what went wrong + # TODO: find out what went wrong? data = {"status": Status.FAILED} data |= {"stdout": Path(inputs[0]), "stderr": Path(inputs[1])} return update_geometry(geom, data) @@ -120,10 +124,10 @@ def _process_output( @join_app def evaluate(reference: Reference, geom: Geometry) -> AppFuture[Geometry]: """""" - if geom == NullState: + if geom == NullState: # TODO: remove this warnings.warn("Skipping NullState..") return copy_app_future(geom) - execute, *files = reference.app_pre(geom=geom) + execute, *files = reference.create_input(geom=geom) if not execute: # TODO: should we reset geom? return copy_app_future(geom) future = reference.app_execute(inputs=files) @@ -135,14 +139,20 @@ def evaluate(reference: Reference, geom: Geometry) -> AppFuture[Geometry]: @psiflow.serializable class Reference(Computable): - outputs: Union[list[str], tuple[str, ...]] + outputs: Sequence[str] batch_size: ClassVar[int] = 1 # TODO: not really used - executor: str - app_pre: ClassVar[Callable] # TODO: fix serialisation - app_execute: ClassVar[Callable] + app_execute: ClassVar[Callable] # TODO: fix serialisation app_post: ClassVar[Callable] _execute_label: ClassVar[str] execute_command: str + executor: ClassVar[str] + n_cores: Optional[int] + + def __init__( + self, outputs: Sequence[str] = ("energy", "forces"), n_cores: int | None = None + ): + self.outputs: tuple[str, ...] = tuple(outputs) + self.n_cores = n_cores def compute( self, @@ -179,14 +189,16 @@ def compute_dataset(self, dataset: Dataset) -> Dataset: return Dataset(future) def _create_apps(self): - definition = psiflow.context().definitions[self.executor] + context = psiflow.context() + definition = context.definitions[self.executor] + if (n := self.n_cores) is not None: + assert n <= definition.spec["cores"] self.execute_command = definition.command() - wq_resources = definition.wq_resources() - self.app_pre = self.create_input self.app_execute = partial( bash_app(_execute, executors=[self.executor]), reference=self, - parsl_resource_specification=wq_resources, + bash_template=context.bash_template, + parsl_resource_specification=definition.wq_resources(n), label=self._execute_label, ) self.app_post = partial( diff --git a/psiflow/sampling/ase.py b/psiflow/sampling/ase.py index 87434fea..69dde699 100644 --- a/psiflow/sampling/ase.py +++ b/psiflow/sampling/ase.py @@ -1,3 +1,4 @@ +import logging from typing import Optional, Union import parsl @@ -9,15 +10,13 @@ from psiflow.data.utils import write_frames from psiflow.geometry import Geometry from psiflow.hamiltonians import Hamiltonian -from psiflow.utils.apps import setup_logger from psiflow.utils.io import _dump_json -from psiflow.utils import TMP_COMMAND, CD_COMMAND, export_env_command -from psiflow.utils.parse import get_task_name_id +from psiflow.utils.parse import get_task_name_id, format_env_vars from ._ase import ALLOWED_MODES, __file__ as file_ase -DEFAULT_EXECUTABLE = 'script.py' -logger = setup_logger(__name__) # logging per module +DEFAULT_EXECUTABLE = "script.py" +logger = logging.getLogger(__name__) class OptimisationFailedError(Exception): @@ -48,6 +47,7 @@ def _execute_ase( inputs: list[DataFuture], outputs: list[DataFuture], env_vars: dict = {}, + bash_template: str = "", stdout: str = parsl.AUTO_LOGNAME, stderr: str = parsl.AUTO_LOGNAME, parsl_resource_specification: Optional[dict] = None, @@ -63,14 +63,11 @@ def _execute_ase( command_opt_args.append(f"--output_traj={outputs[1].filepath}") command_list = [ - TMP_COMMAND, - CD_COMMAND, - export_env_command(env_vars), f"cp {inputs[0].filepath} {DEFAULT_EXECUTABLE}", " ".join(command_opt_args), - "exit 0", # ignore timeout exitcode ] - return "\n".join(command_list) + commands, env = "\n".join(command_list), format_env_vars(env_vars) + return bash_template.format(commands=commands, env=env) execute_ase = bash_app(_execute_ase, executors=["ModelEvaluation"]) @@ -93,12 +90,8 @@ def optimize( context = psiflow.context() definition = context.definitions["ModelEvaluation"] - - command_list = [f"python -u {DEFAULT_EXECUTABLE}"] - if definition.max_simulation_time is not None: - max_time = 0.9 * (60 * definition.max_simulation_time) - command_list = ["timeout -s 15 {}s".format(max_time), *command_list] - command_launch = " ".join(command_list) + command = f"python -u {DEFAULT_EXECUTABLE}" + command_launch = definition.wrap_in_timeout(command) input_geometry = Dataset([state]).extxyz # state can be future hamiltonian = 1.0 * hamiltonian # convert to mixture @@ -128,6 +121,7 @@ def optimize( result = execute_ase( command_launch=command_launch, env_vars=definition.env_vars, + bash_template=context.bash_template, inputs=inputs, outputs=outputs, parsl_resource_specification=definition.wq_resources(1), diff --git a/psiflow/sampling/optimize.py b/psiflow/sampling/optimize.py index 74d7d54e..3d09880e 100644 --- a/psiflow/sampling/optimize.py +++ b/psiflow/sampling/optimize.py @@ -20,7 +20,7 @@ ) from psiflow.sampling.output import HamiltonianComponent from psiflow.utils.io import save_xml -from psiflow.utils import TMP_COMMAND, CD_COMMAND, export_env_command +from psiflow.utils.parse import format_env_vars warnings.warn( @@ -97,6 +97,7 @@ def _execute_ipi( driver_kwargs: list[dict], command_server: str, env_vars: dict = {}, + bash_template: str = "", stdout: str = parsl.AUTO_LOGNAME, stderr: str = parsl.AUTO_LOGNAME, inputs: list = [], @@ -111,17 +112,14 @@ def _execute_ipi( set(d["address"] for d in driver_kwargs) ) commands_driver = make_driver_commands(driver_kwargs, file_xyz_in, files_in) - command_list = [ - TMP_COMMAND, - CD_COMMAND, - export_env_command(env_vars), command_start, command_wait, *commands_driver, "wait", ] - return "\n".join(command_list) + commands, env = "\n".join(command_list), format_env_vars(env_vars) + return bash_template.format(commands=commands, env=env) execute_ipi = bash_app(_execute_ipi, executors=["ModelEvaluation"]) @@ -181,13 +179,14 @@ def optimize( inputs.append(comp.hamiltonian.serialize_function(dtype="float64")) kwargs = {"idx": i, "address": comp.address} if isinstance(comp.hamiltonian, MACEHamiltonian): - kwargs |= definition.get_driver_devices(1)[0] + kwargs |= definition.get_driver_resources.get(1, 1)[0] driver_kwargs.append(kwargs) result = execute_ipi( driver_kwargs, definition.server_command(), env_vars=definition.env_vars, + bash_template=context.bash_template, inputs=inputs, outputs=outputs, parsl_resource_specification=definition.wq_resources(1), diff --git a/psiflow/sampling/output.py b/psiflow/sampling/output.py index 318a86c7..7d74aec7 100644 --- a/psiflow/sampling/output.py +++ b/psiflow/sampling/output.py @@ -1,5 +1,4 @@ -import copy -import re +import logging from enum import Enum from pathlib import Path from dataclasses import dataclass, field, InitVar @@ -13,15 +12,14 @@ import psiflow from psiflow.data import Dataset -from psiflow.geometry import Geometry, NullState +from psiflow.geometry import Geometry from psiflow.hamiltonians import Hamiltonian, MixtureHamiltonian, Zero from psiflow.sampling.walker import Walker from psiflow.utils.io import save_npz -from psiflow.utils.apps import setup_logger from psiflow.utils.parse import get_task_name_id -logger = setup_logger(__name__) # logging per module +logger = logging.getLogger(__name__) DEFAULT_OBSERVABLES = [ diff --git a/psiflow/sampling/sampling.py b/psiflow/sampling/sampling.py index 4328ea3d..dcb8fa80 100644 --- a/psiflow/sampling/sampling.py +++ b/psiflow/sampling/sampling.py @@ -1,6 +1,7 @@ import math import xml.etree.ElementTree as ET from dataclasses import dataclass +from itertools import cycle from typing import Optional, Union, Iterable import parsl @@ -8,6 +9,7 @@ from parsl.app.app import bash_app from parsl.data_provider.files import File from parsl.dataflow.futures import AppFuture, DataFuture +from sympy import print_glsl import psiflow from psiflow.data import Dataset @@ -19,10 +21,10 @@ potential_component_name, HamiltonianComponent, ) +from psiflow.utils.parse import format_env_vars from psiflow.sampling.utils import create_xml_list from psiflow.sampling.walker import Coupling, Walker, partition, Ensemble from psiflow.utils.io import _save_xml -from psiflow.utils import TMP_COMMAND, CD_COMMAND, export_env_command from psiflow.sampling.driver import __file__ as PATH_DRIVER @@ -113,7 +115,7 @@ def setup_sockets(components: list[HamiltonianComponent]) -> list[ET.Element]:
{address} """ - timeout = 60 * psiflow.context().definitions["ModelEvaluation"].timeout + timeout = psiflow.context().definitions["ModelEvaluation"].timeout sockets = [] for comp in components: @@ -362,6 +364,42 @@ def setup_smotion( return smotion +def define_clients_n_kwargs( + walkers: list[Walker], + components: list[HamiltonianComponent], + definition: "ModelEvaluation", + defaults: dict, +) -> tuple[list[dict], int]: + """Figure out i-Pi MD driver (force evaluator) configuration. + How many clients with which arguments on which resources?""" + + # separate hamiltonian components by computational cost + cheap, expensive = {}, {} + for i, comp in enumerate(components): + # the "idx" key corresponds with a serialized function in app inputs + if isinstance(comp.hamiltonian, MACEHamiltonian): + expensive[i] = comp + else: + cheap[i] = comp + + # cheap drivers only get a single client + cheap_kwargs = [] + for i, comp in cheap.items(): + cheap_kwargs.append(defaults | {"idx": i, "address": comp.address}) + + # expensive drivers are assigned to clients by ModelEvaluation + # TODO: currently there is no distinction between global MLPs (for every system) and + # bias MLPs (for a few systems in the total simulation), possibly leading to load balancing problems + n_systems = int(sum([w.nbeads for w in walkers])) + expensive_kwargs = definition.get_driver_resources(n_systems, len(expensive)) + driver_iterator = cycle(expensive.items()) + for kwargs, (i, comp) in zip(expensive_kwargs, driver_iterator): + # TODO: should dtype be configurable? + kwargs |= defaults | {"idx": i, "address": comp.address, "dtype": "float32"} + + return cheap_kwargs + expensive_kwargs, len(expensive_kwargs) + + def make_server_command( command: str, input_xml: File, @@ -392,6 +430,7 @@ def make_driver_commands( driver_kwargs: list[dict], file_xyz: File, files_hamiltonian: list[File] ) -> list[str]: """""" + # TODO: what if 'file_xyz' contains multiple geometries of different size? assert len(driver_kwargs) >= len(files_hamiltonian) default = f'i-pi-driver-py -u -S "" -m custom -P {PATH_DRIVER} -a {{address}} -o {{options}} &' @@ -424,6 +463,7 @@ def _execute_ipi( command_server: str, *plumed_list: str, env_vars: dict = {}, + bash_template: str = "", stdout: str = parsl.AUTO_LOGNAME, stderr: str = parsl.AUTO_LOGNAME, inputs: list = [], @@ -450,10 +490,7 @@ def _execute_ipi( commands_driver = make_driver_commands(driver_kwargs, file_xyz_in, files_in) command_list = [ - TMP_COMMAND, - CD_COMMAND, - "\n".join(write_command_args), - export_env_command(env_vars), + *write_command_args, command_start, command_wait, *commands_driver, @@ -461,7 +498,9 @@ def _execute_ipi( ] if coupling_command: command_list.append(coupling_command) - return "\n".join(command_list) + + commands, env = "\n".join(command_list), format_env_vars(env_vars) + return bash_template.format(commands=commands, env=env) execute_ipi = bash_app(_execute_ipi, executors=["ModelEvaluation"]) @@ -548,29 +587,14 @@ def _sample( # app setup and IO context = psiflow.context() - definition = context.definitions["ModelEvaluation"] input_file = context.new_file("input_", ".xml") _save_xml(simulation, outputs=[input_file]) inputs = [ input_file, Dataset([w.state for w in walkers]).extxyz, + *[c.hamiltonian.serialize_function() for c in hamiltonian_components], ] - # figure out i-Pi MD driver configuration - # how many drivers (force evaluators) with which arguments? - # remove any Harmonic instances because they are not implemented with sockets -- TODO: why? - max_nclients = int(sum([w.nbeads for w in walkers])) - driver_kwargs = [] - for i, comp in enumerate(hamiltonian_components): - inputs.append(comp.hamiltonian.serialize_function()) - kwargs = {"idx": i, "address": comp.address, "max_force": max_force} - if isinstance(comp.hamiltonian, MACEHamiltonian): - kwargs["dtype"] = "float32" # TODO: should this be configurable? - for instance_kwargs in definition.get_driver_devices(max_nclients): - driver_kwargs.append(kwargs | instance_kwargs) - else: - driver_kwargs.append(kwargs) - outputs = [context.new_file("data_", ".xyz")] outputs += [context.new_file("simulation_", ".txt") for _ in walkers] if keep_trajectory: @@ -590,6 +614,11 @@ def _sample( else: coupling_copy_command = None + definition = context.definitions["ModelEvaluation"] + driver_kwargs, n_clients = define_clients_n_kwargs( + walkers, hamiltonian_components, definition, {"max_force": max_force} + ) + # TODO: an app to check for valid input? (e.g., PBC + barostat) result = execute_ipi( len(walkers), @@ -599,9 +628,10 @@ def _sample( definition.server_command(), *plumed_list, # futures env_vars=dict(definition.env_vars), + bash_template=context.bash_template, inputs=inputs, outputs=outputs, - parsl_resource_specification=definition.wq_resources(max_nclients), + parsl_resource_specification=definition.wq_resources(n_clients), ) # process MD output diff --git a/psiflow/sampling/server.py b/psiflow/sampling/server.py index 2fe3c71c..d0bc4e51 100644 --- a/psiflow/sampling/server.py +++ b/psiflow/sampling/server.py @@ -97,7 +97,6 @@ def run(start_xyz: str, input_xml: str): # prepare starting geometries from context_dir data_start: list[ase.Atoms] = read(start_xyz, index=":") for i, at in enumerate(data_start): - print(at.pbc) if not any(at.pbc): # set fake large cell for i-PI at.pbc = True at.cell = Cell(NONPERIODIC_CELL) @@ -188,7 +187,7 @@ def main(): run(args.start_xyz, args.input_xml) softexit.trigger(status="success", message="@PSIFLOW: We are done here.") except ConnectionError: - # TODO: in this case, no output files are generated.. + # TODO: in this case, no output files are generated, so the task fails.. traceback.print_exc() softexit.trigger(status="bad", message="@PSIFLOW: Clients failed to connect.") except np.linalg.LinAlgError: diff --git a/psiflow/sampling/walker.py b/psiflow/sampling/walker.py index 0b2cdec6..42c711d5 100644 --- a/psiflow/sampling/walker.py +++ b/psiflow/sampling/walker.py @@ -14,7 +14,8 @@ from psiflow.data import Dataset from psiflow.geometry import Geometry, check_equality from psiflow.hamiltonians import Hamiltonian, Zero, combine_hamiltonians -from psiflow.order_parameters import OrderParameter + +# from psiflow.order_parameters import OrderParameter from psiflow.sampling.metadynamics import Metadynamics from psiflow.utils.apps import copy_app_future @@ -68,7 +69,7 @@ def get_ensemble_kwargs(walker: "Walker") -> dict: @dataclass class Walker: start: Union[Geometry, AppFuture] - hamiltonian: Hamiltonian = Zero() + hamiltonian: Hamiltonian = field(default_factory=lambda: Zero()) timestep: float = 0.5 temperature: Optional[float] = 300 pressure: Optional[float] = None @@ -78,7 +79,7 @@ class Walker: masses: Union[np.ndarray, float, None] = None nbeads: int = 1 metadynamics: Optional[Metadynamics] = None - order_parameter: Optional[OrderParameter] = None + # order_parameter: Optional['OrderParameter'] = None state: Union[Geometry, AppFuture] = field(init=False) coupling: Optional[Coupling] = field(init=False) @@ -96,9 +97,9 @@ def __post_init__(self): # we cannot check this for futures assert self.pressure is None, "Pressure requires PBC" - if self.order_parameter is not None: - # TODO: order_parameter out of commission - self.start = self.order_parameter.evaluate(self.start) + # if self.order_parameter is not None: + # # TODO: order_parameter out of commission + # self.start = self.order_parameter.evaluate(self.start) if (m := self.masses) is None: pass # do nothing diff --git a/psiflow/serialization.py b/psiflow/serialization.py index c37eea69..ec455b02 100644 --- a/psiflow/serialization.py +++ b/psiflow/serialization.py @@ -4,6 +4,7 @@ import json from pathlib import Path from typing import ClassVar, Optional, Union, get_args, get_origin, get_type_hints +from collections.abc import Sequence from dataclasses import InitVar import typeguard @@ -81,18 +82,18 @@ def serializable(cls): origin = get_origin(type_hint) if origin is ClassVar: continue # do nothing for classvars - elif origin == dict: + elif origin in (dict, Sequence): kind = "attrs" elif isinstance(type_hint, str) and type_hint.startswith("dataclasses"): continue elif isinstance(type_hint, InitVar): continue - if kind is None and not inspect.isclass(type_hint): + elif kind is not None and not inspect.isclass(type_hint): raise ValueError( "{} is formally not a class ({})".format(type_hint, name) ) - if issubclass(type_hint, Serializable): + elif issubclass(type_hint, Serializable): kind = "serial" elif type_hint is Geometry: kind = "geoms" @@ -279,7 +280,7 @@ def deserialize(data_str: str, custom_cls: Optional[list] = None): from psiflow.learning import Learning from psiflow.metrics import Metrics from psiflow.models import MACE - from psiflow.order_parameters import OrderParameter + # from psiflow.order_parameters import OrderParameter from psiflow.reference import CP2K, GPAW, ORCA, ReferenceDummy from psiflow.sampling import Metadynamics, ReplicaExchange, SimulationOutput, Walker @@ -300,7 +301,7 @@ def deserialize(data_str: str, custom_cls: Optional[list] = None): Harmonic, MixtureHamiltonian, Metadynamics, - OrderParameter, + # OrderParameter, ReplicaExchange, SimulationOutput, Walker, diff --git a/psiflow/utils/__init__.py b/psiflow/utils/__init__.py index e103e43e..e69de29b 100644 --- a/psiflow/utils/__init__.py +++ b/psiflow/utils/__init__.py @@ -1,8 +0,0 @@ -TMP_COMMAND = 'tmpdir=$(mktemp -d -p /tmp "mytmpdir.XXXXXXXXXX" || mktemp -d -t "mytmpdir.XXXXXXXXXX")' -CD_COMMAND = 'cd $tmpdir; echo "tmpdir: $PWD"' - - -def export_env_command(env_vars: dict) -> str: - return "export " + " ".join( - [f"{name}={value}" for name, value in env_vars.items()] - ) diff --git a/psiflow/utils/_plumed.py b/psiflow/utils/_plumed.py index 89b67890..611bdc8e 100644 --- a/psiflow/utils/_plumed.py +++ b/psiflow/utils/_plumed.py @@ -1,10 +1,7 @@ import logging import os -import typeguard - -@typeguard.typechecked def try_manual_plumed_linking() -> str: if "PLUMED_KERNEL" not in os.environ.keys(): # try linking manually @@ -23,7 +20,6 @@ def try_manual_plumed_linking() -> str: return os.environ["PLUMED_KERNEL"] -@typeguard.typechecked def remove_comments_printflush(plumed_input: str) -> str: new_input = [] for line in list(plumed_input.split("\n")): @@ -38,7 +34,6 @@ def remove_comments_printflush(plumed_input: str) -> str: return "\n".join(new_input) -@typeguard.typechecked def set_path_in_plumed(plumed_input: str, keyword: str, path_to_set: str) -> str: lines = plumed_input.split("\n") for i, line in enumerate(lines): diff --git a/psiflow/utils/apps.py b/psiflow/utils/apps.py index 0f52bdd4..773915ae 100644 --- a/psiflow/utils/apps.py +++ b/psiflow/utils/apps.py @@ -1,23 +1,20 @@ -from __future__ import annotations # necessary for type-guarding class methods - -import logging -import sys +import shutil +import textwrap from typing import Any, Union +from pathlib import Path import numpy as np -import typeguard -from parsl.app.app import python_app +from parsl import python_app from parsl.data_provider.files import File -@typeguard.typechecked def get_attribute(obj: Any, *attribute_names: str) -> Any: + # TODO: not an app for name in attribute_names: obj = getattr(obj, name) return obj -@typeguard.typechecked def _boolean_or(*args: Union[bool, np.bool_]) -> bool: return any(args) @@ -32,27 +29,6 @@ def _multiply(a, b): multiply = python_app(_multiply, executors=["default_threads"]) -@typeguard.typechecked -def setup_logger(module_name: str, level=logging.INFO) -> logging.Logger: - # Create logger instance for the module - module_logger = logging.getLogger(module_name) - - # Set the desired format string - formatter = logging.Formatter("%(name)s - %(message)s") - - # Create handler to send logs to stdout - stdout_handler = logging.StreamHandler(sys.stdout) - stdout_handler.setFormatter(formatter) - - # Add handler to the logger instance - module_logger.addHandler(stdout_handler) - - # Set the logging level for the logger - module_logger.setLevel(level) - - return module_logger - - def _compute_sum(a, b): return np.add(a, b) @@ -60,37 +36,25 @@ def _compute_sum(a, b): compute_sum = python_app(_compute_sum, executors=["default_threads"]) -@typeguard.typechecked -def _combine_futures(inputs: list[Any]) -> list[Any]: - return list(inputs) - - -combine_futures = python_app(_combine_futures, executors=["default_threads"]) - - -@typeguard.typechecked def _copy_data_future( pass_on_exist: bool = False, inputs: list[File] = [], outputs: list[File] = [], ) -> None: - import shutil - from pathlib import Path - assert len(inputs) == 1 assert len(outputs) == 1 if Path(outputs[0]).is_file() and pass_on_exist: - return None - if Path(inputs[0]).is_file(): + pass + elif Path(inputs[0]).is_file(): shutil.copyfile(inputs[0], outputs[0]) else: # no need to copy empty file pass + return copy_data_future = python_app(_copy_data_future, executors=["default_threads"]) -@typeguard.typechecked def _copy_app_future(future: Any, inputs: list = [], outputs: list = []) -> Any: # inputs/outputs to enforce additional dependencies from copy import deepcopy @@ -101,7 +65,6 @@ def _copy_app_future(future: Any, inputs: list = [], outputs: list = []) -> Any: copy_app_future = python_app(_copy_app_future, executors=["default_threads"]) -@typeguard.typechecked def _log_message(logger, message, *futures): if len(futures) > 0: logger.info(message.format(*futures)) @@ -112,23 +75,12 @@ def _log_message(logger, message, *futures): log_message = python_app(_log_message, executors=["default_threads"]) -def _pack(*args): - return args # TODO: _combine_futures? - - -pack = python_app(_pack, executors=["default_threads"]) - - -@typeguard.typechecked -def _unpack_i(result: Union[np.ndarray, list, tuple], i: int) -> Any: - assert i <= len(result) - return result[i] - +@python_app(executors=["default_threads"]) +def pack(*args: Any) -> tuple[Any]: + """Combine passed futures into a single future.""" + return args -unpack_i = python_app(_unpack_i, executors=["default_threads"]) - -@typeguard.typechecked def _concatenate(*arrays: np.ndarray) -> np.ndarray: return np.concatenate(arrays) @@ -136,9 +88,28 @@ def _concatenate(*arrays: np.ndarray) -> np.ndarray: concatenate = python_app(_concatenate, executors=["default_threads"]) -@typeguard.typechecked def _isnan(a: Union[float, np.ndarray]) -> bool: return bool(np.any(np.isnan(a))) isnan = python_app(_isnan, executors=["default_threads"]) + + +def create_bash_template(tmpdir_root: str, keep_tmpdirs: bool) -> str: + """Create general wrapper for all bash apps. The exitcode ensures that every app completes successfully.""" + template = f""" + # Create and move into new tmpdir for app execution + tmpdir=$(mktemp -d -p {tmpdir_root} "psiflow-tmp.XXXXXXXXXX") + cd $tmpdir; echo "tmpdir: $PWD" + {{env}} + printenv + + # Actual app definition goes here + {{commands}} + + # Cleanup + {'cd ../.. && rm -r $tmpdir' if not keep_tmpdirs else ''} + exit 0 + """ + return textwrap.dedent(template) + diff --git a/psiflow/utils/config.py b/psiflow/utils/config.py new file mode 100644 index 00000000..59e140b5 --- /dev/null +++ b/psiflow/utils/config.py @@ -0,0 +1,23 @@ +PSIFLOW_INTERNAL = "psiflow_internal" +PARSL_LOGFILE = "parsl.log" +PSIFLOW_LOGFILE = "psiflow.log" +CONTEXT_DIR = "context_dir" + + +DEFAULT_CONFIG = """ +parsl_log_level: WARNING +psiflow_log_level: WARNING +usage_tracking: 3 +default_threads: 8 +tmpdir_root: /tmp +keep_tmpdirs: false + +ModelEvaluation: + executor: threadpool + max_threads: 2 + +ModelTraining: + executor: threadpool + max_threads: 2 +""" + diff --git a/psiflow/utils/io.py b/psiflow/utils/io.py index c5e93855..2123429a 100644 --- a/psiflow/utils/io.py +++ b/psiflow/utils/io.py @@ -3,12 +3,10 @@ from typing import Any import numpy as np -import typeguard from parsl.app.app import python_app from parsl.data_provider.files import File -@typeguard.typechecked def _save_yaml( input_dict: dict, outputs: list[File] = [], @@ -39,7 +37,6 @@ def _make_dict_safe(arg): save_yaml = python_app(_save_yaml, executors=["default_threads"]) -@typeguard.typechecked def _save_xml( element: ET.Element, outputs: list = [], @@ -52,7 +49,6 @@ def _save_xml( save_xml = python_app(_save_xml, executors=["default_threads"]) -@typeguard.typechecked def _load_numpy(inputs: list[File] = [], **kwargs) -> np.ndarray: return np.loadtxt(inputs[0], **kwargs) @@ -60,8 +56,7 @@ def _load_numpy(inputs: list[File] = [], **kwargs) -> np.ndarray: load_numpy = python_app(_load_numpy, executors=["default_threads"]) -@typeguard.typechecked -def _read_yaml(inputs: list[File] = [], outputs: list[File] = []) -> dict: +def _read_yaml(inputs: list[File] = []) -> dict: import yaml with open(inputs[0], "r") as f: @@ -72,7 +67,6 @@ def _read_yaml(inputs: list[File] = [], outputs: list[File] = []) -> dict: read_yaml = python_app(_read_yaml, executors=["default_threads"]) -@typeguard.typechecked def _save_txt(data: str, outputs: list[File] = []) -> None: with open(outputs[0], "w") as f: f.write(data) @@ -81,7 +75,6 @@ def _save_txt(data: str, outputs: list[File] = []) -> None: save_txt = python_app(_save_txt, executors=["default_threads"]) -@typeguard.typechecked def _load_metrics(inputs: list = []) -> np.recarray: # TODO: stop using recarrays return np.load(inputs[0], allow_pickle=True) @@ -90,7 +83,6 @@ def _load_metrics(inputs: list = []) -> np.recarray: load_metrics = python_app(_load_metrics, executors=["default_threads"]) -@typeguard.typechecked def _save_metrics(data: np.recarray, outputs: list = []) -> None: # TODO: stop using recarrays with open(outputs[0], "wb") as f: @@ -100,7 +92,6 @@ def _save_metrics(data: np.recarray, outputs: list = []) -> None: save_metrics = python_app(_save_metrics, executors=["default_threads"]) -@typeguard.typechecked def _dump_json( inputs: list = [], outputs: list = [], diff --git a/psiflow/utils/logging.py b/psiflow/utils/logging.py new file mode 100644 index 00000000..373a6134 --- /dev/null +++ b/psiflow/utils/logging.py @@ -0,0 +1,41 @@ +import logging +from pathlib import Path + +import parsl + + +def setup_logging(file: Path, level=logging.INFO) -> None: + """Setup the Psiflow parent logger""" + logger = logging.getLogger('psiflow') + logger.setLevel(level) + logger.propagate = False # do not propagate messages to root logger + + fh = logging.FileHandler(file) + formatter = logging.Formatter( + fmt='%(asctime)s [%(levelname)s] %(name)s \t %(message)s', + datefmt='%Y-%m-%d %H:%M' + ) + fh.setFormatter(formatter) + logger.addHandler(fh) + + +def log_dfk_tasks(verbose: bool = False): + """Get an overview of all tasks stored in the parsl DFK. For debugging purposes.""" + dfk = parsl.dfk() + parsl.wait_for_current_tasks() + log = ["- Parsl task overview -"] + if not verbose: + log += [f"{i}\t{d['func_name']}" for i, d in dfk.tasks.items()] + log.append("- Parsl task overview -") + print(*log, sep="\n") + return + + for i, d in dfk.tasks.items(): + args = [(_.split("/")[-1] if isinstance(_, str) else _) for _ in d["args"]] + if "inputs" in (kwargs := d["kwargs"]): + kwargs["inputs"] = [f.filename for f in kwargs["inputs"]] + if "outputs" in kwargs: + kwargs["outputs"] = [f.filename for f in kwargs["outputs"]] + log.append(f"\n{i}\t{d['func_name']:<30}\n{args}\n{kwargs}") + log.append("- Parsl task overview -") + print(*log, sep="\n") \ No newline at end of file diff --git a/psiflow/utils/parse.py b/psiflow/utils/parse.py index 99fbcd72..9f265d18 100644 --- a/psiflow/utils/parse.py +++ b/psiflow/utils/parse.py @@ -7,9 +7,7 @@ class LineNotFoundError(Exception): - """Call to find_line failed""" - - pass + pass # call to find_line failed def find_line( @@ -24,7 +22,8 @@ def find_line( idx_slice = slice(idx_start, idx_start + max_lines) else: idx_start = idx_start or len(lines) - 1 - idx_slice = slice(idx_start, idx_start - max_lines, -1) + idx_stop = max(idx_start - max_lines, 0) + idx_slice = slice(idx_start, idx_stop, -1) for i, l in enumerate(lines[idx_slice]): if l.strip().startswith(line): if not reverse: @@ -41,8 +40,12 @@ def lines_to_array( return np.array([line.split()[start:stop] for line in lines], dtype=dtype) -def string_to_timedelta(timedelta: str) -> datetime.timedelta: +def str_to_timedelta(s: str) -> datetime.timedelta: """""" + # TODO: this will probably not work in general + time = datetime.datetime.strptime(s, "%H:%M:%S") + return datetime.timedelta(hours=time.hour, minutes=time.minute, seconds=time.second) + allowed_units = "weeks", "days", "hours", "minutes", "seconds" time_list = timedelta.split() values, units = time_list[:-1:2], time_list[1::2] @@ -50,9 +53,10 @@ def string_to_timedelta(timedelta: str) -> datetime.timedelta: return datetime.timedelta(**kwargs) + def get_task_logs(task_id: int) -> tuple[Path, Path]: """""" - path = Path.cwd().resolve() / PSIFLOW_INTERNAL / "000/task_logs" # TODO + path = Path.cwd().resolve() / PSIFLOW_INTERNAL / "task_logs" stdout = next(path.rglob(f"task_{task_id}_*.stdout")) stderr = next(path.rglob(f"task_{task_id}_*.stderr")) return stdout, stderr @@ -61,3 +65,11 @@ def get_task_logs(task_id: int) -> tuple[Path, Path]: def get_task_name_id(logfile: str) -> tuple[str, str]: _, task_id, task_name = Path(logfile).stem.split("_", maxsplit=2) return task_name, task_id + + +def format_env_vars(env_vars: dict) -> str: + if len(env_vars) == 0: + return "" + return "export" + " ".join([f"{k}={v}" for k, v in env_vars.items()]) + + diff --git a/psiflow/utils/wq.py b/psiflow/utils/wq.py new file mode 100644 index 00000000..4ecc01d4 --- /dev/null +++ b/psiflow/utils/wq.py @@ -0,0 +1,40 @@ +# TODO: this probably does not work in a nested way + +import logging + + +logger = logging.getLogger(__name__) # logging per module + + +WQ_RESOURCES_REGISTRY = [] + + +def register_definition(definition: 'ExecutionDefinition') -> None: + """""" + if (spec := definition.spec) is None: + return # threadpool does not have priority + + WQ_RESOURCES_REGISTRY.append((definition.name, spec)) + spec["priority"] = SetWQPriority.default + + +class SetWQPriority: + """Manage the WQ priority tag as context manager""" + default = 0 + + def __init__(self, value: int, verbose: bool = False) -> None: + self.value = value + self.verbose = verbose + + def __enter__(self): + if self.verbose: + logger.info(f"SetWQPriority setting priority:\t{self.value}") + for n, spec in WQ_RESOURCES_REGISTRY: + spec["priority"] = self.value + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.verbose: + logger.info(f"SetWQPriority unsetting {self.value}") + for n, spec in WQ_RESOURCES_REGISTRY: + spec["priority"] = SetWQPriority.default diff --git a/pyproject.toml b/pyproject.toml index 1f20c233..61df7a51 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] name = "psiflow" -version = "4.0.1" +version = "4.0.2" description = "Library for developing interatomic potentials" readme = "README.md" requires-python = ">=3.10" @@ -13,12 +13,11 @@ dependencies = [ "ase>=3.23.0", "pyyaml>=6.0", "numpy>=1.22.3, <2", - "parsl==2024.12.16", + "parsl==2026.02.23", "prettytable", "psutil", "cp2k-input-tools @ git+https://github.com/cp2k/cp2k-input-tools.git@3b9929735dcb3c8c0620a548b1fe20efecbad077", # need 2024.1 "ipi @ git+https://github.com/i-pi/i-pi.git@v3.1.10", - "pytimeparse", ] diff --git a/tests/conftest.py b/tests/conftest.py index e09e86e0..2052219c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,3 @@ -import xml.etree.ElementTree as ET from dataclasses import asdict from pathlib import Path @@ -44,7 +43,6 @@ def context(request, tmp_path_factory): path_config = Path(request.config.getoption("--psiflow-config")) with open(path_config, "r") as f: psiflow_config = yaml.safe_load(f) - psiflow_config["path"] = tmp_path_factory.mktemp("psiflow_internal") psiflow.load(psiflow_config) context = psiflow.context() # noqa: F841 yield diff --git a/tests/test_execution.py b/tests/test_execution.py new file mode 100644 index 00000000..6a456d9c --- /dev/null +++ b/tests/test_execution.py @@ -0,0 +1,59 @@ +import yaml + +from psiflow.execution import ExecutionDefinition + + +def test_execution(): + """Check a few execution parameters. This could definitely be expanded upon.""" + data_yaml = """ + executor: threadpool + max_threads: 42 + use_gpu: false + max_runtime: 00:59:00 + local: + cores: 4 + memory: 8 + """ + definition = ExecutionDefinition.from_config(**yaml.safe_load(data_yaml)) + assert definition.executor_type == "threadpool" + assert definition.provider is None + assert definition.container is None + assert definition.lifetime == float("inf") + assert definition.max_runtime == 3540 + assert definition.task_slots == 42 + assert definition.cores_per_task == 1 + assert definition.use_gpu == False + assert definition.spec is None + + data_yaml = """ + cores_per_task: 3 + gpus_per_task: 0 + mem_per_task: 6 + min_runtime: 00:15:00 + env_vars: + CUSTOM_KEY: custom_var + slurm: + cores_per_node: 8 + mem_per_node: 16 + walltime: "02:00:00" + """ + spec = { + "cores": 3, + "disk": 0, + "gpus": 0, + "memory": 6000, + "priority": 0, + "running_time_min": 900, + } + data = yaml.safe_load(data_yaml) + definition = ExecutionDefinition.from_config(**data) + assert definition.executor_type == "workqueue" + assert definition.provider is not None + assert definition.lifetime == 7200 + assert definition.max_runtime == 7140 + assert definition.task_slots == 2 + assert definition.cores_per_task == 3 + assert definition.use_gpu == False + assert definition.spec == spec + assert definition.env_vars["CUSTOM_KEY"] == "CUSTOM_VAR" + diff --git a/tests/test_learning.py b/tests/test_learning.py index e450a978..430a6ca4 100644 --- a/tests/test_learning.py +++ b/tests/test_learning.py @@ -9,7 +9,7 @@ from psiflow.metrics import Metrics, _create_table, parse_walker_log, reconstruct_dtypes from psiflow.reference import ReferenceDummy from psiflow.sampling import SimulationOutput, Walker -from psiflow.utils.apps import combine_futures +# from psiflow.utils.apps import combine_futures # use pack instead from psiflow.utils.io import _load_metrics, _save_metrics, load_metrics, save_metrics diff --git a/tests/test_reference.py b/tests/test_reference.py index 8fa5b09e..a1be636f 100644 --- a/tests/test_reference.py +++ b/tests/test_reference.py @@ -146,11 +146,10 @@ def test_cp2k_parse_output(): ENERGY| Total FORCE_EVAL ( QS ) energy [a.u.]: -14.202993407031412 - ATOMIC FORCES in [a.u.] - - # Atom Kind Element X Y Z - 1 1 O 0.00000000 0.00000000 0.00000000 - SUM OF ATOMIC FORCES 0.00000000 0.00000000 0.00000000 0.00000000 + FORCES| Atomic forces [hartree/bohr] + FORCES| Atom x y z |f| + FORCES| 1 0 0 0 0 + FORCES| Sum 0.00000000 0.00000000 0.00000000 0.00000000 STRESS| Analytical stress tensor [GPa] STRESS| x y z @@ -169,7 +168,7 @@ def test_cp2k_parse_output(): ### SKIPPED A BIT ### - ------------------------------------------------------------------------------- + ------------------------------------------------------------------------------- - - - T I M I N G - - - @@ -223,7 +222,7 @@ def test_cp2k_success(simple_cp2k_input, geom_h2_p): if "Number of threads for this process" in line: nthreads = int(line.split()[-1]) definition = psiflow.context().definitions["CP2K"] - ncores = definition.cores_per_worker + ncores = definition.cores_per_task assert ncores == nprocesses assert 1 == nthreads @@ -306,7 +305,6 @@ def test_cp2k_failure(geom_h2_p): def test_cp2k_memory(simple_cp2k_input): - # TODO: test_cp2k_memory == test_cp2k_timeout until memory constraints work reference = CP2K(simple_cp2k_input) geometry = Geometry.from_data( numbers=np.ones(4000), diff --git a/tests/test_sampling.py b/tests/test_sampling.py index 7d955c6a..a502a6af 100644 --- a/tests/test_sampling.py +++ b/tests/test_sampling.py @@ -317,7 +317,7 @@ def test_output_status(dataset): # walltime definition = psiflow.context().definitions["ModelEvaluation"] - definition.max_simulation_time = 5 / 60 # 5 seconds + definition.max_runtime = 5 # seconds outputs = sample([walker], steps=10000) assert outputs[0].status.result() == Status.TIMEOUT assert outputs[0].time.result() > 0