From 2f9379e441a35c403caf55795916f11602b4cdd2 Mon Sep 17 00:00:00 2001 From: James Byrne Date: Tue, 27 May 2025 21:18:03 +0100 Subject: [PATCH 01/19] Filtering non-existent files in configuration --- preprocess_toolbox/processor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/preprocess_toolbox/processor.py b/preprocess_toolbox/processor.py index 4aa9778..56a3e55 100644 --- a/preprocess_toolbox/processor.py +++ b/preprocess_toolbox/processor.py @@ -375,7 +375,8 @@ def _process_channel(self, for split, var_files in self.source_files.items() for vn, files in var_files.items() for file in files - if var_name == vn]))) + if var_name == vn + and os.path.exists(file)]))) if len(source_files) > 0: logging.info("Opening {} files for {}".format(len(source_files), var_name)) From 0189534500e3320cc3826f0b401f8212d6000d88 Mon Sep 17 00:00:00 2001 From: James Byrne Date: Wed, 28 May 2025 12:53:13 +0100 Subject: [PATCH 02/19] Removing further crazy debugging --- preprocess_toolbox/base.py | 2 +- preprocess_toolbox/interface.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/preprocess_toolbox/base.py b/preprocess_toolbox/base.py index 4c74da3..2f2417d 100644 --- a/preprocess_toolbox/base.py +++ b/preprocess_toolbox/base.py @@ -144,7 +144,7 @@ def get_dataset(self, for var_filepaths in self.processed_files[vn].values()] logging.info("Got {} filenames to open dataset with!".format(len(var_files))) - logging.debug(pformat(var_files)) + # logging.debug(pformat(var_files)) # TODO: where's my parallel mfdataset please!? with (dask.config.set(**{'array.slicing.split_large_chunks': True})): diff --git a/preprocess_toolbox/interface.py b/preprocess_toolbox/interface.py index cddc5e3..b2552b1 100644 --- a/preprocess_toolbox/interface.py +++ b/preprocess_toolbox/interface.py @@ -34,7 +34,6 @@ def get_processor_implementation(config: os.PathLike) -> object: create_kwargs = dict(**remaining) logging.info("Attempting to instantiate {} with loaded configuration".format(implementation)) - logging.debug("Converted kwargs from the retrieved configuration: {}".format(create_kwargs)) return implementation(**create_kwargs) @@ -57,7 +56,6 @@ def get_processor_from_source(identifier: str, source_cfg: dict) -> object: create_kwargs = {k: v for k, v in source_cfg.items() if k not in ["dataset_config", "implementation"]} logging.info("Attempting to instantiate {} with loaded configuration".format(source_cfg["implementation"])) - logging.debug("Converted kwargs from the retrieved configuration: {}".format(create_kwargs)) return get_implementation(source_cfg["implementation"])( get_dataset_config_implementation(source_cfg["dataset_config"]), From cec39cc8952128856131d30e54527505e84b1777 Mon Sep 17 00:00:00 2001 From: James Byrne Date: Thu, 29 May 2025 00:38:27 +0100 Subject: [PATCH 03/19] Updating to ensure dates are month ends when extending ranges --- preprocess_toolbox/utils.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/preprocess_toolbox/utils.py b/preprocess_toolbox/utils.py index 5ecc4dc..5d06aca 100644 --- a/preprocess_toolbox/utils.py +++ b/preprocess_toolbox/utils.py @@ -6,8 +6,9 @@ from dateutil.relativedelta import relativedelta import orjson +import pandas as pd -from download_toolbox.interface import DatasetConfig +from download_toolbox.interface import DatasetConfig, Frequency def get_config(config_path: os.PathLike): @@ -44,6 +45,9 @@ def get_extension_dates(ds_config: DatasetConfig, op = operator.sub if reverse else operator.add extended_date = op(date, relativedelta(**attrs)) + if ds_config.frequency.attribute == Frequency.MONTH: + extended_date = extended_date + pd.offsets.MonthEnd(0) + if extended_date not in dates: if all([os.path.exists(ds_config.var_filepath(var_config, [extended_date])) for var_config in ds_config.variables]): From 143e8f8cd1c44a01568b481134afc7db6170159c Mon Sep 17 00:00:00 2001 From: James Byrne Date: Fri, 30 May 2025 10:22:10 +0100 Subject: [PATCH 04/19] Removing concatenation on time dimension as will produce duplicates and thus non-monotonic indexes in some situations --- preprocess_toolbox/processor.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/preprocess_toolbox/processor.py b/preprocess_toolbox/processor.py index 56a3e55..2a1d121 100644 --- a/preprocess_toolbox/processor.py +++ b/preprocess_toolbox/processor.py @@ -261,7 +261,6 @@ def _init_source_data(self, for var_name, var_files in self._source_files[split].items(): logging.info("Got {} files for {}:{}".format(len(var_files), split, var_name)) - logging.debug(pformat(self._source_files)) def _normalise_array_mean(self, var_name: str, da: object, denormalise: bool=False): """ @@ -341,7 +340,6 @@ def _normalise_array_scaling(self, var_name: str, da: object, denormalise: bool= elif self.norm_split_dates: logging.debug("Generating norm-scaling min-max from {} training " "dates".format(len(self.norm_split_dates))) - norm_samples = da.sel(time=self.norm_split_dates).data norm_samples = norm_samples.ravel() @@ -380,19 +378,15 @@ def _process_channel(self, if len(source_files) > 0: logging.info("Opening {} files for {}".format(len(source_files), var_name)) + logging.debug("Files to be opened:\n{}".format(pformat(source_files))) # In the old IceNet library there was dubiousness about the source of the # data so this was harder. Now we work with whatever we get from download-toolbox ds = xr.open_mfdataset( source_files, - # Solves issue with inheriting files without - # time dimension (only having coordinate) combine="nested", - concat_dim="time", coords="minimal", compat="override", - # TODO: review this, but if lat-lon is in the file, it's signalling bigger issues - # drop_variables=("lat", "lon"), parallel=self._parallel) da = getattr(ds, var_name) da = da.astype(self.dtype) @@ -400,7 +394,6 @@ def _process_channel(self, # FIXME: we should ideally store train dates against the # normalisation and climatology, to ensure recalculation on # reprocess. All this need be is in the path, to be honest - if var_suffix == "anom": if len(self._anom_clim_splits) < 1 and self._refdir is None: raise ProcessingError("You must provide a list of splits via " @@ -589,7 +582,6 @@ def lead_time(self) -> int: @property def norm_split_dates(self): - # TODO: functools.cached_property, though slightly odd behaviour re. write-ability return [date for clim_split in self._normalisation_splits for date in self._splits[clim_split]] From 3f1c25d6e6e8c798359e50b92cfb7693c2cd0787 Mon Sep 17 00:00:00 2001 From: James Byrne Date: Fri, 30 May 2025 11:54:14 +0100 Subject: [PATCH 05/19] Fixes #32: resolving the date checking in monthly handling --- preprocess_toolbox/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/preprocess_toolbox/utils.py b/preprocess_toolbox/utils.py index 5d06aca..fe59667 100644 --- a/preprocess_toolbox/utils.py +++ b/preprocess_toolbox/utils.py @@ -45,7 +45,7 @@ def get_extension_dates(ds_config: DatasetConfig, op = operator.sub if reverse else operator.add extended_date = op(date, relativedelta(**attrs)) - if ds_config.frequency.attribute == Frequency.MONTH: + if ds_config.frequency == Frequency.MONTH: extended_date = extended_date + pd.offsets.MonthEnd(0) if extended_date not in dates: @@ -58,6 +58,7 @@ def get_extension_dates(ds_config: DatasetConfig, logging.warning("{} will be dropped due to missing data {}". format(date, extended_date)) dropped_dates.append(date) + break return sorted(list(set(additional_dates))), sorted(list(set(dropped_dates))) From 6e9274c83ed7f42c6e45387cd180ba689b069205 Mon Sep 17 00:00:00 2001 From: James Byrne Date: Fri, 30 May 2025 12:08:16 +0100 Subject: [PATCH 06/19] Fixes #32: fixes resolutions monthly handling --- preprocess_toolbox/processor.py | 3 ++- preprocess_toolbox/utils.py | 9 +++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/preprocess_toolbox/processor.py b/preprocess_toolbox/processor.py index 2a1d121..b95969d 100644 --- a/preprocess_toolbox/processor.py +++ b/preprocess_toolbox/processor.py @@ -240,7 +240,8 @@ def _init_source_data(self, # Calculating lead and lag dates that aren't already accounted for in splits if self._lag_time > 0: logging.info("Including lag of {} {}s".format(self._lag_time, ds_config.frequency.attribute)) - additional_lag_dates, dropped_lag_dates = get_extension_dates(ds_config, dates, self._lag_time, reverse=True) + additional_lag_dates, dropped_lag_dates = get_extension_dates(ds_config, dates, self._lag_time + 1, + start_step=1, reverse=True) dates += additional_lag_dates drop_dates[split] += dropped_lag_dates logging.info("Lag added {} dates for {} category: {} - {}". diff --git a/preprocess_toolbox/utils.py b/preprocess_toolbox/utils.py index fe59667..cadc610 100644 --- a/preprocess_toolbox/utils.py +++ b/preprocess_toolbox/utils.py @@ -36,17 +36,18 @@ def get_config_filename(args: argparse.Namespace, prefix: str = "loader"): def get_extension_dates(ds_config: DatasetConfig, dates: list, num_steps: int, - reverse=False): + start_step: int = 0, + reverse: bool = False): additional_dates, dropped_dates = [], [] for date in dates: - for time in range(num_steps): - attrs = {"{}s".format(ds_config.frequency.attribute): time + 1} + for time in range(start_step, num_steps): + attrs = {"{}s".format(ds_config.frequency.attribute): time} op = operator.sub if reverse else operator.add extended_date = op(date, relativedelta(**attrs)) if ds_config.frequency == Frequency.MONTH: - extended_date = extended_date + pd.offsets.MonthEnd(0) + extended_date = pd.to_datetime(extended_date + pd.offsets.MonthEnd(0)).date() if extended_date not in dates: if all([os.path.exists(ds_config.var_filepath(var_config, [extended_date])) From bcd70ff5daa4e74be287634d3ebd3fdca703b289 Mon Sep 17 00:00:00 2001 From: James Byrne Date: Fri, 30 May 2025 12:43:46 +0100 Subject: [PATCH 07/19] Adding lock=False to open_mfdataset as per xarray#3961 - we do not specify engine --- preprocess_toolbox/processor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/preprocess_toolbox/processor.py b/preprocess_toolbox/processor.py index b95969d..598dde9 100644 --- a/preprocess_toolbox/processor.py +++ b/preprocess_toolbox/processor.py @@ -388,7 +388,8 @@ def _process_channel(self, combine="nested", coords="minimal", compat="override", - parallel=self._parallel) + parallel=self._parallel, + lock=False) da = getattr(ds, var_name) da = da.astype(self.dtype) From 84732a06bf74524b9ee61dfde602298c99761ed1 Mon Sep 17 00:00:00 2001 From: James Byrne Date: Wed, 4 Jun 2025 13:17:36 +0100 Subject: [PATCH 08/19] Updating open and split analysis to better handle lag/lead --- preprocess_toolbox/processor.py | 38 +++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/preprocess_toolbox/processor.py b/preprocess_toolbox/processor.py index 598dde9..f97c051 100644 --- a/preprocess_toolbox/processor.py +++ b/preprocess_toolbox/processor.py @@ -95,8 +95,16 @@ def __init__(self, self._normalisation_splits = [] if normalisation_splits is None else normalisation_splits self._parallel = parallel_opens self._refdir = ref_procdir + + ## + # Split dates - + # + # TODO: splits -> { dates, sources }, but currently sources are separate... self._splits = splits + self._valid_split_dates = splits + # TODO: add self._dropped_dates based on DATA + self._source_files = dict() if init_source: @@ -223,16 +231,17 @@ def _init_source_data(self, :return: """ - split_dates_required = dict() + # TODO: distracted, but this needs to be better written drop_dates = dict() + all_dates = dict() for split in self._splits.keys(): - dates = sorted(self._splits[split]) + all_dates[split] = sorted(self._splits[split]) drop_dates[split] = list() - if dates: + if all_dates[split]: logging.info("Processing {} dates for {} category: {} - {}". - format(len(dates), split, min(dates), max(dates))) + format(len(all_dates[split]), split, min(all_dates[split]), max(all_dates[split]))) else: logging.info("No {} dates for this processor".format(split)) continue @@ -240,24 +249,24 @@ def _init_source_data(self, # Calculating lead and lag dates that aren't already accounted for in splits if self._lag_time > 0: logging.info("Including lag of {} {}s".format(self._lag_time, ds_config.frequency.attribute)) - additional_lag_dates, dropped_lag_dates = get_extension_dates(ds_config, dates, self._lag_time + 1, + additional_lag_dates, dropped_lag_dates = get_extension_dates(ds_config, all_dates[split], self._lag_time + 2, start_step=1, reverse=True) - dates += additional_lag_dates + all_dates[split] += additional_lag_dates drop_dates[split] += dropped_lag_dates logging.info("Lag added {} dates for {} category: {} - {}". - format(len(dates), split, min(dates), max(dates))) + format(len(all_dates[split]), split, min(all_dates[split]), max(all_dates[split]))) if self._lead_time > 0: logging.info("Including lead of {} {}s".format(self._lead_time, ds_config.frequency.attribute)) - additional_lead_dates, dropped_lead_dates = get_extension_dates(ds_config, dates, self._lead_time) - dates += additional_lead_dates + additional_lead_dates, dropped_lead_dates = get_extension_dates(ds_config, all_dates[split], self._lead_time) + all_dates[split] += additional_lead_dates drop_dates[split] += dropped_lead_dates logging.info("Lead added {} dates for {} category: {} - {}". - format(len(dates), split, min(dates), max(dates))) + format(len(all_dates[split]), split, min(all_dates[split]), max(all_dates[split]))) - split_dates_required[split] = sorted([_ for _ in dates if _ not in drop_dates[split]]) + self._valid_split_dates[split] = sorted([_ for _ in all_dates[split] if _ not in drop_dates[split]]) for split in self._splits.keys(): - self._source_files[split] = {var_config.name: ds_config.var_filepaths(var_config, split_dates_required[split]) + self._source_files[split] = {var_config.name: ds_config.var_filepaths(var_config, all_dates[split]) for var_config in ds_config.variables} for var_name, var_files in self._source_files[split].items(): @@ -385,9 +394,6 @@ def _process_channel(self, # data so this was harder. Now we work with whatever we get from download-toolbox ds = xr.open_mfdataset( source_files, - combine="nested", - coords="minimal", - compat="override", parallel=self._parallel, lock=False) da = getattr(ds, var_name) @@ -503,7 +509,7 @@ def get_config(self, **kwargs): "path": self.path, "processed_files": self._processed_files, "source_files": self._source_files, - "splits": self.splits, + "splits": self._valid_split_dates, } @staticmethod From 728bdd85c68c652676e722b11de0b24ced8b68bb Mon Sep 17 00:00:00 2001 From: James Byrne Date: Tue, 10 Jun 2025 00:16:25 +0100 Subject: [PATCH 09/19] Fixing single time entry for rotation --- preprocess_toolbox/dataset/process.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/preprocess_toolbox/dataset/process.py b/preprocess_toolbox/dataset/process.py index 2cda6ad..9f52582 100644 --- a/preprocess_toolbox/dataset/process.py +++ b/preprocess_toolbox/dataset/process.py @@ -150,6 +150,9 @@ def rotate_dataset(ref_file: os.PathLike, wind_cubes[vars_to_rotate[1]], angles, ) + if len(wind_cubes_r[vars_to_rotate[0]].shape) == 2 and len(wind_cubes_r[vars_to_rotate[1]].shape) == 2: + wind_cubes_r[vars_to_rotate[0]] = iris.util.new_axis(wind_cubes_r[vars_to_rotate[0]], "time") + wind_cubes_r[vars_to_rotate[1]] = iris.util.new_axis(wind_cubes_r[vars_to_rotate[1]], "time") except iris.exceptions.CoordinateNotFoundError: logging.exception("Failure to rotate due to coordinate issues. " "moving onto next file") From 7937d9819096d08da606dc7309c394743adc3595 Mon Sep 17 00:00:00 2001 From: James Byrne Date: Tue, 10 Jun 2025 00:16:54 +0100 Subject: [PATCH 10/19] Resolving some of the intricacies of lag dates with predictions --- preprocess_toolbox/processor.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/preprocess_toolbox/processor.py b/preprocess_toolbox/processor.py index f97c051..f420bf2 100644 --- a/preprocess_toolbox/processor.py +++ b/preprocess_toolbox/processor.py @@ -102,7 +102,7 @@ def __init__(self, # TODO: splits -> { dates, sources }, but currently sources are separate... self._splits = splits - self._valid_split_dates = splits + self._dropped_split_dates = {} # TODO: add self._dropped_dates based on DATA self._source_files = dict() @@ -231,7 +231,6 @@ def _init_source_data(self, :return: """ - # TODO: distracted, but this needs to be better written drop_dates = dict() all_dates = dict() @@ -247,9 +246,9 @@ def _init_source_data(self, continue # Calculating lead and lag dates that aren't already accounted for in splits - if self._lag_time > 0: + if self._lag_time >= 0: logging.info("Including lag of {} {}s".format(self._lag_time, ds_config.frequency.attribute)) - additional_lag_dates, dropped_lag_dates = get_extension_dates(ds_config, all_dates[split], self._lag_time + 2, + additional_lag_dates, dropped_lag_dates = get_extension_dates(ds_config, all_dates[split], self._lag_time + 1, start_step=1, reverse=True) all_dates[split] += additional_lag_dates drop_dates[split] += dropped_lag_dates @@ -263,7 +262,8 @@ def _init_source_data(self, logging.info("Lead added {} dates for {} category: {} - {}". format(len(all_dates[split]), split, min(all_dates[split]), max(all_dates[split]))) - self._valid_split_dates[split] = sorted([_ for _ in all_dates[split] if _ not in drop_dates[split]]) + self._dropped_split_dates[split] = sorted(drop_dates[split]) + all_dates[split] = sorted([_ for _ in all_dates[split] if _ not in drop_dates[split]]) for split in self._splits.keys(): self._source_files[split] = {var_config.name: ds_config.var_filepaths(var_config, all_dates[split]) @@ -388,7 +388,6 @@ def _process_channel(self, if len(source_files) > 0: logging.info("Opening {} files for {}".format(len(source_files), var_name)) - logging.debug("Files to be opened:\n{}".format(pformat(source_files))) # In the old IceNet library there was dubiousness about the source of the # data so this was harder. Now we work with whatever we get from download-toolbox @@ -398,6 +397,7 @@ def _process_channel(self, lock=False) da = getattr(ds, var_name) da = da.astype(self.dtype) + logging.debug("Files to be opened: {}".format(da.dims)) # FIXME: we should ideally store train dates against the # normalisation and climatology, to ensure recalculation on @@ -509,7 +509,9 @@ def get_config(self, **kwargs): "path": self.path, "processed_files": self._processed_files, "source_files": self._source_files, - "splits": self._valid_split_dates, + "splits": {split: [ + date for date in dates if date not in self._dropped_split_dates[split] + ] for split, dates in self._splits.items()}, } @staticmethod From 681e7aaf1ded2dc978ad70c1e6553cacb8b919bd Mon Sep 17 00:00:00 2001 From: James Byrne Date: Thu, 12 Jun 2025 11:19:14 +0100 Subject: [PATCH 11/19] Improving date awareness when processing datasets --- preprocess_toolbox/utils.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/preprocess_toolbox/utils.py b/preprocess_toolbox/utils.py index cadc610..7862dfb 100644 --- a/preprocess_toolbox/utils.py +++ b/preprocess_toolbox/utils.py @@ -7,6 +7,7 @@ import orjson import pandas as pd +import xarray as xr from download_toolbox.interface import DatasetConfig, Frequency @@ -49,11 +50,24 @@ def get_extension_dates(ds_config: DatasetConfig, if ds_config.frequency == Frequency.MONTH: extended_date = pd.to_datetime(extended_date + pd.offsets.MonthEnd(0)).date() - if extended_date not in dates: - if all([os.path.exists(ds_config.var_filepath(var_config, [extended_date])) - for var_config in ds_config.variables]): - # We only add these dates into the mix if all necessary files exist - additional_dates.append(extended_date) + # Check we don't know we have data, and also ignore previous occurrences + if extended_date not in dates and extended_date not in additional_dates: + extended_date_var_files = [ds_config.var_filepath(var_config, [extended_date]) + for var_config in ds_config.variables] + if all([os.path.exists(df) for df in extended_date_var_files]): + # The above will catch those items that fall outside the file output boundary, but not missing + # dates within ALL files. This next clause is more expensive, but necessary to catch everything! + logging.debug("Files exist, double checking whether {} appears in data itself across {} files". + format(extended_date, len(extended_date_var_files))) + + # TODO: this won't catch partially available dates where not all files have the date, but some do + if pd.Timestamp(extended_date) in xr.open_mfdataset(extended_date_var_files).time.values: + # We only add these dates into the mix if all necessary files exist + additional_dates.append(extended_date) + else: + logging.warning("Nope, {} not in data itself so dropping {}".format(extended_date, date)) + dropped_dates.append(date) + break else: # Otherwise, warn that the lag data means this is being dropped logging.warning("{} will be dropped due to missing data {}". From 35f57eecd8017fae0bf1c7a4a4ca80ca77e905d4 Mon Sep 17 00:00:00 2001 From: James Byrne Date: Thu, 12 Jun 2025 11:27:06 +0100 Subject: [PATCH 12/19] Correcting lag indexing --- preprocess_toolbox/processor.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/preprocess_toolbox/processor.py b/preprocess_toolbox/processor.py index f420bf2..8a483b9 100644 --- a/preprocess_toolbox/processor.py +++ b/preprocess_toolbox/processor.py @@ -248,8 +248,11 @@ def _init_source_data(self, # Calculating lead and lag dates that aren't already accounted for in splits if self._lag_time >= 0: logging.info("Including lag of {} {}s".format(self._lag_time, ds_config.frequency.attribute)) - additional_lag_dates, dropped_lag_dates = get_extension_dates(ds_config, all_dates[split], self._lag_time + 1, - start_step=1, reverse=True) + additional_lag_dates, dropped_lag_dates = get_extension_dates( + ds_config, all_dates[split], + # We offset by two, because -1 is channel one, so we need to account for lag == 1 being -2 + self._lag_time + 2, + start_step=1, reverse=True) all_dates[split] += additional_lag_dates drop_dates[split] += dropped_lag_dates logging.info("Lag added {} dates for {} category: {} - {}". From 941bfdbf9401bb37464c08b82b64a83411cd91e7 Mon Sep 17 00:00:00 2001 From: "Bryn N. Ubald" <55503826+bnubald@users.noreply.github.com> Date: Wed, 25 Jun 2025 11:36:24 +0100 Subject: [PATCH 13/19] Add vim swap files to .gitignore --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index d80203e..9cfbce0 100644 --- a/.gitignore +++ b/.gitignore @@ -108,5 +108,8 @@ ENV/ .vscode/ .idea/ +# vim +*.swp + /data/ /preprocess_toolbox/scratches/ From 4704c9674222f8f237f716a26c34ed919fb1f375 Mon Sep 17 00:00:00 2001 From: "Bryn N. Ubald" <55503826+bnubald@users.noreply.github.com> Date: Wed, 25 Jun 2025 11:37:56 +0100 Subject: [PATCH 14/19] Fix #37: Enable alternate output paths --- preprocess_toolbox/dataset/cli.py | 6 ++++++ preprocess_toolbox/loader/cli.py | 9 ++++++++- preprocess_toolbox/processor.py | 1 + 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/preprocess_toolbox/dataset/cli.py b/preprocess_toolbox/dataset/cli.py index c54bac0..6ac7b98 100644 --- a/preprocess_toolbox/dataset/cli.py +++ b/preprocess_toolbox/dataset/cli.py @@ -25,6 +25,11 @@ def process_dataset(): ds_config = get_dataset_config_implementation(args.source) splits = process_split_args(args, frequency=ds_config.frequency) + # Overwrite argparse default for this func if base path not provided + base_path = args.destination_path + if base_path == "processed_data": + base_path = os.path.join(".", "processed") + implementation = NormalisingChannelProcessor \ if args.implementation is None \ else get_implementation(args.implementation) @@ -36,6 +41,7 @@ def process_dataset(): anom_clim_splits=args.processing_splits, config_path=args.config, identifier=args.destination_id, + base_path=base_path, # TODO: nomenclature is old here, lag and lead make sense in forecasting, but not in here # so this mapping should be revised throughout the library - we don't necessarily forecast! lag_time=args.split_head, diff --git a/preprocess_toolbox/loader/cli.py b/preprocess_toolbox/loader/cli.py index cbeb61b..b202f2c 100644 --- a/preprocess_toolbox/loader/cli.py +++ b/preprocess_toolbox/loader/cli.py @@ -142,9 +142,16 @@ def get_channel_info_from_processor(cfg_segment: str): # but this library doesn't care or know of it respectively. raise RuntimeError("--config-path is invalid for this CLI endpoint, sorry...") + # Overwrite argparse default for this func if base path not provided + base_path = args.destination_path + if base_path == "processed_data": + base_path = os.path.join(".", "processed") + processor = proc_impl(ds_config, [args.channel_name,], - args.channel_name) + args.channel_name, + base_path=base_path, + ) processor.process() update_config(get_config_filename(args), cfg_segment, diff --git a/preprocess_toolbox/processor.py b/preprocess_toolbox/processor.py index 8a483b9..c5818a4 100644 --- a/preprocess_toolbox/processor.py +++ b/preprocess_toolbox/processor.py @@ -502,6 +502,7 @@ def get_config(self, **kwargs): return { "implementation": "{}:{}".format(self.__module__, self.__class__.__name__), + "base_path": self._base_path, "anomoly_vars": self._anom_vars, "absolute_vars": self.abs_vars, "dataset_config": self._dataset_config, From 5499f2b465f3de6a7ff8206889b5c99a3e22745f Mon Sep 17 00:00:00 2001 From: "Bryn N. Ubald" <55503826+bnubald@users.noreply.github.com> Date: Wed, 25 Jun 2025 14:47:56 +0100 Subject: [PATCH 15/19] Add vim swo files to .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 9cfbce0..cce7ef6 100644 --- a/.gitignore +++ b/.gitignore @@ -110,6 +110,7 @@ ENV/ # vim *.swp +*.swo /data/ /preprocess_toolbox/scratches/ From dad17232553624ed519fd36246726df341d97a0f Mon Sep 17 00:00:00 2001 From: "Bryn N. Ubald" <55503826+bnubald@users.noreply.github.com> Date: Wed, 25 Jun 2025 14:49:41 +0100 Subject: [PATCH 16/19] Fix #37: Cleaner alternate output paths --- preprocess_toolbox/cli.py | 3 ++- preprocess_toolbox/dataset/cli.py | 9 ++------- preprocess_toolbox/loader/cli.py | 13 ++++--------- 3 files changed, 8 insertions(+), 17 deletions(-) diff --git a/preprocess_toolbox/cli.py b/preprocess_toolbox/cli.py index 95ca6d3..d278a1b 100644 --- a/preprocess_toolbox/cli.py +++ b/preprocess_toolbox/cli.py @@ -16,13 +16,14 @@ class ProcessingArgParser(BaseArgParser): def __init__(self, *args, + base_path="processed_data", **kwargs): super().__init__(*args, **kwargs) self.add_argument("source", type=str) self.add_argument("-p", "--destination-path", help="Folder that any output data collections will be put in", - type=str, default="processed_data") + type=str, default=base_path) def add_ref_ds(self): self.add_argument("reference", type=str) diff --git a/preprocess_toolbox/dataset/cli.py b/preprocess_toolbox/dataset/cli.py index 6ac7b98..056f478 100644 --- a/preprocess_toolbox/dataset/cli.py +++ b/preprocess_toolbox/dataset/cli.py @@ -14,7 +14,7 @@ def process_dataset(): - args = (ProcessingArgParser(). + args = (ProcessingArgParser(base_path="processed"). add_concurrency(). add_destination(). add_implementation(). @@ -25,11 +25,6 @@ def process_dataset(): ds_config = get_dataset_config_implementation(args.source) splits = process_split_args(args, frequency=ds_config.frequency) - # Overwrite argparse default for this func if base path not provided - base_path = args.destination_path - if base_path == "processed_data": - base_path = os.path.join(".", "processed") - implementation = NormalisingChannelProcessor \ if args.implementation is None \ else get_implementation(args.implementation) @@ -41,7 +36,7 @@ def process_dataset(): anom_clim_splits=args.processing_splits, config_path=args.config, identifier=args.destination_id, - base_path=base_path, + base_path=args.destination_path, # TODO: nomenclature is old here, lag and lead make sense in forecasting, but not in here # so this mapping should be revised throughout the library - we don't necessarily forecast! lag_time=args.split_head, diff --git a/preprocess_toolbox/loader/cli.py b/preprocess_toolbox/loader/cli.py index b202f2c..5f7655b 100644 --- a/preprocess_toolbox/loader/cli.py +++ b/preprocess_toolbox/loader/cli.py @@ -45,12 +45,12 @@ def add_sections(self): class MetaArgParser(LoaderArgParser): - def __init__(self): + def __init__(self, base_path="processed_data"): super().__init__() self.add_argument("ground_truth_dataset") self.add_argument("-p", "--destination-path", help="Folder that any output data collections will be put in", - type=str, default="processed_data") + type=str, default=base_path) def add_channel(self): self.add_argument("channel_name") @@ -129,7 +129,7 @@ def add_processed(): def get_channel_info_from_processor(cfg_segment: str): - args = (MetaArgParser(). + args = (MetaArgParser(base_path="processed"). add_channel(). parse_args()) @@ -142,15 +142,10 @@ def get_channel_info_from_processor(cfg_segment: str): # but this library doesn't care or know of it respectively. raise RuntimeError("--config-path is invalid for this CLI endpoint, sorry...") - # Overwrite argparse default for this func if base path not provided - base_path = args.destination_path - if base_path == "processed_data": - base_path = os.path.join(".", "processed") - processor = proc_impl(ds_config, [args.channel_name,], args.channel_name, - base_path=base_path, + base_path=args.destination_path, ) processor.process() update_config(get_config_filename(args), From f1353886059d43746aca932f7cec04a5f78bb577 Mon Sep 17 00:00:00 2001 From: "Bryn N. Ubald" <55503826+bnubald@users.noreply.github.com> Date: Thu, 26 Jun 2025 15:04:49 +0100 Subject: [PATCH 17/19] Fix #37: Pass unknown args to implementation Also, allow non-default input loader-path --- preprocess_toolbox/loader/cli.py | 37 +++++++++++++++++++++----------- preprocess_toolbox/utils.py | 7 ++++++ 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/preprocess_toolbox/loader/cli.py b/preprocess_toolbox/loader/cli.py index 5f7655b..3822606 100644 --- a/preprocess_toolbox/loader/cli.py +++ b/preprocess_toolbox/loader/cli.py @@ -51,6 +51,9 @@ def __init__(self, base_path="processed_data"): self.add_argument("-p", "--destination-path", help="Folder that any output data collections will be put in", type=str, default=base_path) + self.add_argument("-l", "--loader-path", + help="Path to the loader JSON config file to load", + type=str, default=None) def add_channel(self): self.add_argument("channel_name") @@ -74,6 +77,9 @@ def create(): channels=dict(), ) destination_path = get_config_filename(args) + destination_directory = os.path.dirname(destination_path) + if destination_directory: + os.makedirs(destination_directory, exist_ok=True) if not os.path.exists(destination_path): with open(destination_path, "w") as fh: @@ -129,24 +135,31 @@ def add_processed(): def get_channel_info_from_processor(cfg_segment: str): - args = (MetaArgParser(base_path="processed"). + args, unknown_args = (MetaArgParser(base_path="processed"). add_channel(). - parse_args()) + parse_known_args()) proc_impl = get_implementation(args.implementation) ds_config = get_dataset_config_implementation(args.ground_truth_dataset) if args.config is not None: - # FIXME: args.config contains the location of the dataset config on render, but - # this is not part of this pattern! DS is either ground truth or in derived class, - # but this library doesn't care or know of it respectively. - raise RuntimeError("--config-path is invalid for this CLI endpoint, sorry...") - - processor = proc_impl(ds_config, - [args.channel_name,], - args.channel_name, - base_path=args.destination_path, - ) + # FIXME: args.config contains the location of the dataset config on render, but + # this is not part of this pattern! DS is either ground truth or in derived class, + # but this library doesn't care or know of it respectively. + raise RuntimeError("--config-path is invalid for this CLI endpoint, sorry...") + + impl_args = ( + ds_config, + [ + args.channel_name, + ], + args.channel_name, + ) + impl_kwargs = {"base_path": args.destination_path} + if unknown_args: + impl_kwargs |= unknown_args + + processor = proc_impl(*impl_args, **impl_kwargs) processor.process() update_config(get_config_filename(args), cfg_segment, diff --git a/preprocess_toolbox/utils.py b/preprocess_toolbox/utils.py index 7862dfb..138e2b5 100644 --- a/preprocess_toolbox/utils.py +++ b/preprocess_toolbox/utils.py @@ -25,6 +25,13 @@ def get_config_filename(args: argparse.Namespace, prefix: str = "loader"): if prefix is not None: default_loader_config = "{}.{}".format(prefix, default_loader_config) + if ( + "loader_path" in args + and args.loader_path is not None + and (os.path.isfile(args.loader_path) or not os.path.exists(args.loader_path)) + ): + return args.loader_path + # TODO: this is a bit grim, but to allow different config output paths it's very flexible. refactor if args.config is not None and (os.path.isfile(args.config) or not os.path.exists(args.config)): logging.warning("{} has been specified, overriding default name {}".format(args.config, args.name)) From eb44bbfc32b7950401c2562f133645af6ee88ec5 Mon Sep 17 00:00:00 2001 From: Bryn Noel Ubald <55503826+bnubald@users.noreply.github.com> Date: Wed, 3 Sep 2025 11:29:37 +0100 Subject: [PATCH 18/19] Use h5netcdf engine to read in processor.py Fixes Seg Fault issue when preprocessor attempts to read from a netcdf file with chunked dataarrays. Refs: #40 --- preprocess_toolbox/processor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/preprocess_toolbox/processor.py b/preprocess_toolbox/processor.py index c5818a4..92f8792 100644 --- a/preprocess_toolbox/processor.py +++ b/preprocess_toolbox/processor.py @@ -396,6 +396,7 @@ def _process_channel(self, # data so this was harder. Now we work with whatever we get from download-toolbox ds = xr.open_mfdataset( source_files, + engine="h5netcdf", parallel=self._parallel, lock=False) da = getattr(ds, var_name) From 6992459716d4bf1339e909ec93bd6b0e8efda06d Mon Sep 17 00:00:00 2001 From: Bryn Noel Ubald <55503826+bnubald@users.noreply.github.com> Date: Wed, 3 Sep 2025 11:42:14 +0100 Subject: [PATCH 19/19] Fix repeated FutureWarnings on compat="no_conflicts" FutureWarning on compat not being "no_conflicts" by default in future versions. Refs: #42 --- preprocess_toolbox/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/preprocess_toolbox/utils.py b/preprocess_toolbox/utils.py index 138e2b5..6507e84 100644 --- a/preprocess_toolbox/utils.py +++ b/preprocess_toolbox/utils.py @@ -68,7 +68,7 @@ def get_extension_dates(ds_config: DatasetConfig, format(extended_date, len(extended_date_var_files))) # TODO: this won't catch partially available dates where not all files have the date, but some do - if pd.Timestamp(extended_date) in xr.open_mfdataset(extended_date_var_files).time.values: + if pd.Timestamp(extended_date) in xr.open_mfdataset(extended_date_var_files, compat="no_conflicts").time.values: # We only add these dates into the mix if all necessary files exist additional_dates.append(extended_date) else: