From 951001584076112cc2f01c9875fbe59dc7cfb4d3 Mon Sep 17 00:00:00 2001 From: "Eric M. Baker" Date: Wed, 18 Jan 2023 15:32:23 -0800 Subject: [PATCH 1/4] ENH+BF: Add '--indefinite-mode' flag to the forward command of the CLI, and resolve errors when writing datasets to disk and the transfer syntax is not ImplicitVRLittleEndian. --- dcm/cli.py | 33 ++++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) mode change 100644 => 100755 dcm/cli.py diff --git a/dcm/cli.py b/dcm/cli.py old mode 100644 new mode 100755 index 9bb9775..a3056c2 --- a/dcm/cli.py +++ b/dcm/cli.py @@ -718,7 +718,7 @@ def _make_route_data_cb( """Return callback that queues dataset/metadata from incoming events""" async def callback(event: evt.Event) -> int: - # TODO: Do we need to embed the file_meta here? + event.dataset.file_meta = event.file_meta await res_q.put(event.dataset) return 0x0 # Success @@ -726,7 +726,10 @@ async def callback(event: evt.Event) -> int: async def _do_route( - local: DcmNode, router: Router, inactive_timeout: Optional[int] = None + local: DcmNode, + router: Router, + inactive_timeout: Optional[int] = None, + indefinite_mode: bool = False, ) -> None: local_ent = LocalEntity(local) event_filter = EventFilter(event_types=frozenset((evt.EVT_C_STORE,))) @@ -735,6 +738,7 @@ async def _do_route( if inactive_timeout: last_update = datetime.now() last_reported = 0 + num_flushed = 0 async with router.route(report=report) as route_q: fwd_cb = _make_route_data_cb(route_q) async with local_ent.listen(fwd_cb, event_filter=event_filter): @@ -751,8 +755,13 @@ async def _do_route( if ( datetime.now() - last_update ).total_seconds() > inactive_timeout: - print("Timeout due to inactivity") - break + if not indefinite_mode: + print("Timeout due to inactivity") + break + elif report.n_reported > num_flushed: + num_flushed = report.n_reported + report.log_issues() + report.clear() finally: print("Listener shutting down") @@ -774,10 +783,20 @@ async def _do_route( @click.option( "--inactive-timeout", type=int, - help="Stop listening after this many seconds of inactivity", + help="Stop listening after this many seconds of inactivity, unless running in " + "`--indefinite-mode`. If running in `--indefinite-mode`, then flush any accumulated " + "errors/issues after this many seconds of inactivity.", +) +@click.option( + "--indefinite-mode", + is_flag=True, + help="Run as a long lived listener. After `--inactive-timeout` seconds " + "of inactivity, any issues/errors that have accumulated since the last pause in " + "activity will be logged/raised. If no errors are raised, then the listener will " + "continue running.", ) def forward( - params, dests, edit, edit_json, local, dir_format, out_file_ext, inactive_timeout + params, dests, edit, edit_json, local, dir_format, out_file_ext, inactive_timeout, indefinite_mode ): """Listen for incoming DICOM files on network and forward to dests""" local = params["config"].get_local_node(local) @@ -813,7 +832,7 @@ def forward( dests = params["config"].get_routes(dests) router = Router(dests) - asyncio.run(_do_route(local, router, inactive_timeout)) + asyncio.run(_do_route(local, router, inactive_timeout, indefinite_mode)) def make_print_cb(fmt, elem_filter=None): From 38cdee80928cc59f9a3fcc885146c44a82b4e512 Mon Sep 17 00:00:00 2001 From: "Eric M. Baker" Date: Wed, 18 Jan 2023 15:40:59 -0800 Subject: [PATCH 2/4] BF: Fix issue where configuration parameters for LocalDir that could be passed through the CLI but were not were causing those configuration parameters in the configuration file to be replaced with `None`. --- dcm/conf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) mode change 100644 => 100755 dcm/conf.py diff --git a/dcm/conf.py b/dcm/conf.py old mode 100644 new mode 100755 index 4462264..fad4ca6 --- a/dcm/conf.py +++ b/dcm/conf.py @@ -288,7 +288,7 @@ def set_net_repo_kwargs(self, **kwargs: Dict[str, Any]) -> None: def set_local_dir_kwargs(self, **kwargs: Dict[str, Any]) -> None: """Override parameters for any LocalDir built""" - self._local_dir_kwargs = kwargs + self._local_dir_kwargs = {k: v for k, v in kwargs.items() if v is not None} def set_static_route_kwargs(self, **kwargs: Dict[str, Any]) -> None: """Override parameters for any StaticRoute""" From 8931c63493fad5f1505f0755abc9782fd82e3931 Mon Sep 17 00:00:00 2001 From: "Eric M. Baker" Date: Wed, 1 Feb 2023 14:21:13 -0800 Subject: [PATCH 3/4] BF: Fully clear DynamicTransferReport --- dcm/route.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) mode change 100644 => 100755 dcm/route.py diff --git a/dcm/route.py b/dcm/route.py old mode 100644 new mode 100755 index a9a5a3d..d72b664 --- a/dcm/route.py +++ b/dcm/route.py @@ -548,7 +548,12 @@ def n_warnings(self) -> int: @property def n_reported(self) -> int: - return self.store_reports.n_input + # return self.store_reports.n_input + count = 0 + for store_report_list in self.store_reports.values(): + for store_report in store_report_list: + count += store_report.n_input + return count def add_store_report( self, dest: DataBucket[Any, Any], store_report: StoreReportType @@ -586,6 +591,11 @@ def clear(self) -> None: # needed. Not clear if it makes sense to do anything about it # here. super().clear() + # Must explicitly set the MultiReportList reports as done for them + # to be cleared. + for report_list in self.store_reports.values(): + if all(report.done for report in report_list): + report_list.done = True self.store_reports.clear() From 05377b27a32cf158184c650258dca3847fb17c88 Mon Sep 17 00:00:00 2001 From: "Eric M. Baker" Date: Wed, 1 Feb 2023 14:38:21 -0800 Subject: [PATCH 4/4] BF: Only log issues when in indefinite mode when all sub-reports are done. --- dcm/cli.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dcm/cli.py b/dcm/cli.py index a3056c2..3143ebf 100755 --- a/dcm/cli.py +++ b/dcm/cli.py @@ -1,7 +1,7 @@ """Command line interface""" from __future__ import annotations import csv -import sys, os, logging, json, re, signal +import sys, os, logging, json, re, signal, itertools import asyncio from contextlib import ExitStack from copy import deepcopy @@ -738,7 +738,6 @@ async def _do_route( if inactive_timeout: last_update = datetime.now() last_reported = 0 - num_flushed = 0 async with router.route(report=report) as route_q: fwd_cb = _make_route_data_cb(route_q) async with local_ent.listen(fwd_cb, event_filter=event_filter): @@ -746,7 +745,7 @@ async def _do_route( try: while True: await asyncio.sleep(1.0) - if last_update is not None: + if last_update is not None and report.all_reported: n_reported = report.n_reported if n_reported != last_reported: last_update = datetime.now() @@ -758,10 +757,11 @@ async def _do_route( if not indefinite_mode: print("Timeout due to inactivity") break - elif report.n_reported > num_flushed: - num_flushed = report.n_reported + elif report.n_sent > 0: + if any(not store_report.done for store_report in itertools.chain.from_iterable(report.store_reports.values())): + continue report.log_issues() - report.clear() + report.clear() finally: print("Listener shutting down")