Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions dcm/cli.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Command line interface"""
from __future__ import annotations
import csv
import sys, os, logging, json, re, signal
import sys, os, logging, json, re, signal, itertools
import asyncio
from contextlib import ExitStack
from copy import deepcopy
Expand Down Expand Up @@ -718,15 +718,18 @@ def _make_route_data_cb(
"""Return callback that queues dataset/metadata from incoming events"""

async def callback(event: evt.Event) -> int:
# TODO: Do we need to embed the file_meta here?
event.dataset.file_meta = event.file_meta
await res_q.put(event.dataset)
return 0x0 # Success

return callback


async def _do_route(
local: DcmNode, router: Router, inactive_timeout: Optional[int] = None
local: DcmNode,
router: Router,
inactive_timeout: Optional[int] = None,
indefinite_mode: bool = False,
) -> None:
local_ent = LocalEntity(local)
event_filter = EventFilter(event_types=frozenset((evt.EVT_C_STORE,)))
Expand All @@ -742,7 +745,7 @@ async def _do_route(
try:
while True:
await asyncio.sleep(1.0)
if last_update is not None:
if last_update is not None and report.all_reported:
n_reported = report.n_reported
if n_reported != last_reported:
last_update = datetime.now()
Expand All @@ -751,8 +754,14 @@ async def _do_route(
if (
datetime.now() - last_update
).total_seconds() > inactive_timeout:
print("Timeout due to inactivity")
break
if not indefinite_mode:
print("Timeout due to inactivity")
break
elif report.n_sent > 0:
if any(not store_report.done for store_report in itertools.chain.from_iterable(report.store_reports.values())):
continue
report.log_issues()
report.clear()
finally:
print("Listener shutting down")

Expand All @@ -774,10 +783,20 @@ async def _do_route(
@click.option(
"--inactive-timeout",
type=int,
help="Stop listening after this many seconds of inactivity",
help="Stop listening after this many seconds of inactivity, unless running in "
"`--indefinite-mode`. If running in `--indefinite-mode`, then flush any accumulated "
"errors/issues after this many seconds of inactivity.",
)
@click.option(
"--indefinite-mode",
is_flag=True,
help="Run as a long lived listener. After `--inactive-timeout` seconds "
"of inactivity, any issues/errors that have accumulated since the last pause in "
"activity will be logged/raised. If no errors are raised, then the listener will "
"continue running.",
)
def forward(
params, dests, edit, edit_json, local, dir_format, out_file_ext, inactive_timeout
params, dests, edit, edit_json, local, dir_format, out_file_ext, inactive_timeout, indefinite_mode
):
"""Listen for incoming DICOM files on network and forward to dests"""
local = params["config"].get_local_node(local)
Expand Down Expand Up @@ -813,7 +832,7 @@ def forward(
dests = params["config"].get_routes(dests)

router = Router(dests)
asyncio.run(_do_route(local, router, inactive_timeout))
asyncio.run(_do_route(local, router, inactive_timeout, indefinite_mode))


def make_print_cb(fmt, elem_filter=None):
Expand Down
2 changes: 1 addition & 1 deletion dcm/conf.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def set_net_repo_kwargs(self, **kwargs: Dict[str, Any]) -> None:

def set_local_dir_kwargs(self, **kwargs: Dict[str, Any]) -> None:
"""Override parameters for any LocalDir built"""
self._local_dir_kwargs = kwargs
self._local_dir_kwargs = {k: v for k, v in kwargs.items() if v is not None}

def set_static_route_kwargs(self, **kwargs: Dict[str, Any]) -> None:
"""Override parameters for any StaticRoute"""
Expand Down
12 changes: 11 additions & 1 deletion dcm/route.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,12 @@ def n_warnings(self) -> int:

@property
def n_reported(self) -> int:
return self.store_reports.n_input
# return self.store_reports.n_input
count = 0
for store_report_list in self.store_reports.values():
for store_report in store_report_list:
count += store_report.n_input
return count

def add_store_report(
self, dest: DataBucket[Any, Any], store_report: StoreReportType
Expand Down Expand Up @@ -586,6 +591,11 @@ def clear(self) -> None:
# needed. Not clear if it makes sense to do anything about it
# here.
super().clear()
# Must explicitly set the MultiReportList reports as done for them
# to be cleared.
for report_list in self.store_reports.values():
if all(report.done for report in report_list):
report_list.done = True
self.store_reports.clear()


Expand Down