diff --git a/dcm/cli.py b/dcm/cli.py old mode 100644 new mode 100755 index 9bb9775..3143ebf --- a/dcm/cli.py +++ b/dcm/cli.py @@ -1,7 +1,7 @@ """Command line interface""" from __future__ import annotations import csv -import sys, os, logging, json, re, signal +import sys, os, logging, json, re, signal, itertools import asyncio from contextlib import ExitStack from copy import deepcopy @@ -718,7 +718,7 @@ def _make_route_data_cb( """Return callback that queues dataset/metadata from incoming events""" async def callback(event: evt.Event) -> int: - # TODO: Do we need to embed the file_meta here? + event.dataset.file_meta = event.file_meta await res_q.put(event.dataset) return 0x0 # Success @@ -726,7 +726,10 @@ async def callback(event: evt.Event) -> int: async def _do_route( - local: DcmNode, router: Router, inactive_timeout: Optional[int] = None + local: DcmNode, + router: Router, + inactive_timeout: Optional[int] = None, + indefinite_mode: bool = False, ) -> None: local_ent = LocalEntity(local) event_filter = EventFilter(event_types=frozenset((evt.EVT_C_STORE,))) @@ -742,7 +745,7 @@ async def _do_route( try: while True: await asyncio.sleep(1.0) - if last_update is not None: + if last_update is not None and report.all_reported: n_reported = report.n_reported if n_reported != last_reported: last_update = datetime.now() @@ -751,8 +754,14 @@ async def _do_route( if ( datetime.now() - last_update ).total_seconds() > inactive_timeout: - print("Timeout due to inactivity") - break + if not indefinite_mode: + print("Timeout due to inactivity") + break + elif report.n_sent > 0: + if any(not store_report.done for store_report in itertools.chain.from_iterable(report.store_reports.values())): + continue + report.log_issues() + report.clear() finally: print("Listener shutting down") @@ -774,10 +783,20 @@ async def _do_route( @click.option( "--inactive-timeout", type=int, - help="Stop listening after this many seconds of inactivity", + help="Stop listening after this many seconds of inactivity, unless running in " + "`--indefinite-mode`. If running in `--indefinite-mode`, then flush any accumulated " + "errors/issues after this many seconds of inactivity.", +) +@click.option( + "--indefinite-mode", + is_flag=True, + help="Run as a long lived listener. After `--inactive-timeout` seconds " + "of inactivity, any issues/errors that have accumulated since the last pause in " + "activity will be logged/raised. If no errors are raised, then the listener will " + "continue running.", ) def forward( - params, dests, edit, edit_json, local, dir_format, out_file_ext, inactive_timeout + params, dests, edit, edit_json, local, dir_format, out_file_ext, inactive_timeout, indefinite_mode ): """Listen for incoming DICOM files on network and forward to dests""" local = params["config"].get_local_node(local) @@ -813,7 +832,7 @@ def forward( dests = params["config"].get_routes(dests) router = Router(dests) - asyncio.run(_do_route(local, router, inactive_timeout)) + asyncio.run(_do_route(local, router, inactive_timeout, indefinite_mode)) def make_print_cb(fmt, elem_filter=None): diff --git a/dcm/conf.py b/dcm/conf.py old mode 100644 new mode 100755 index 4462264..fad4ca6 --- a/dcm/conf.py +++ b/dcm/conf.py @@ -288,7 +288,7 @@ def set_net_repo_kwargs(self, **kwargs: Dict[str, Any]) -> None: def set_local_dir_kwargs(self, **kwargs: Dict[str, Any]) -> None: """Override parameters for any LocalDir built""" - self._local_dir_kwargs = kwargs + self._local_dir_kwargs = {k: v for k, v in kwargs.items() if v is not None} def set_static_route_kwargs(self, **kwargs: Dict[str, Any]) -> None: """Override parameters for any StaticRoute""" diff --git a/dcm/route.py b/dcm/route.py old mode 100644 new mode 100755 index a9a5a3d..d72b664 --- a/dcm/route.py +++ b/dcm/route.py @@ -548,7 +548,12 @@ def n_warnings(self) -> int: @property def n_reported(self) -> int: - return self.store_reports.n_input + # return self.store_reports.n_input + count = 0 + for store_report_list in self.store_reports.values(): + for store_report in store_report_list: + count += store_report.n_input + return count def add_store_report( self, dest: DataBucket[Any, Any], store_report: StoreReportType @@ -586,6 +591,11 @@ def clear(self) -> None: # needed. Not clear if it makes sense to do anything about it # here. super().clear() + # Must explicitly set the MultiReportList reports as done for them + # to be cleared. + for report_list in self.store_reports.values(): + if all(report.done for report in report_list): + report_list.done = True self.store_reports.clear()