From 5247be1eb2aacb6576af2be8595f7acdcfb51772 Mon Sep 17 00:00:00 2001 From: Peter Ercius Date: Wed, 22 Apr 2026 16:00:27 -0700 Subject: [PATCH 1/8] fix parameter get to match json parameter name. Update comments. Improve logging. --- operators/read-tem-data/run.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/operators/read-tem-data/run.py b/operators/read-tem-data/run.py index 2778a9a9..0b2579fa 100644 --- a/operators/read-tem-data/run.py +++ b/operators/read-tem-data/run.py @@ -15,7 +15,7 @@ data_dir = Path(f"{DATA_DIRECTORY}/raw_data_dir") @operator -def read_data_pae( +def read_data_ncempy( inputs: BytesMessage | None, parameters: dict[str, Any] ) -> BytesMessage | None: """This reads data from disk and sends it on.""" @@ -23,17 +23,16 @@ def read_data_pae( # This operator does not require inputs # Extract parameters - directory = parameters.get("directory", "/test_data") + directory = parameters.get("raw_data_dir", "/test_data") file = parameters.get("file", "test.emd") - # mount_dir = parameters.get("mount_dir", "~") - # TODO: Implement operator logic here + # Read data from disk logger.info("read_tem_data operator running...") - logger.info(f'directory: {directory}') - logger.info(f'mount directory: {data_dir}') - logger.info(f'file: {file}') - # file_path = Path(directory) / Path(file) + logger.info(f"parameter directory: {directory}") + logger.info(f"internal mount directory: {data_dir}") + logger.info(f"file name: {file}") + file_path = data_dir / Path(file) try: dd = ncempy.read(file_path) @@ -44,7 +43,7 @@ def read_data_pae( logger.info('Problem loading file. Using zeros array.') time.sleep(3.0) - # TODO: Process and return result + # Process and return result data_bytes = data.tobytes() header = MessageHeader(subject=MessageSubject.BYTES, meta={'shape': data.shape, 'dtype': str(data.dtype)}) return BytesMessage(header=header, data=data_bytes) From 18a94eb47c3e9e2347aa7462e72cc046b5f650f1 Mon Sep 17 00:00:00 2001 From: Peter Ercius Date: Wed, 22 Apr 2026 16:04:28 -0700 Subject: [PATCH 2/8] print error if the file is not loaded. Also, return None if file loading fails instead of zeros array. --- operators/read-tem-data/run.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/operators/read-tem-data/run.py b/operators/read-tem-data/run.py index 0b2579fa..61099e4d 100644 --- a/operators/read-tem-data/run.py +++ b/operators/read-tem-data/run.py @@ -38,12 +38,15 @@ def read_data_ncempy( dd = ncempy.read(file_path) data = dd['data'] logger.info(f'file data shape: {data.shape}') - except Exception: - data = np.zeros((100, 100), dtype=np.uint8) - logger.info('Problem loading file. Using zeros array.') + except Exception as e: + logger.info(f"Problem loading file. Error: {e}") + return None + + # Send the data every 3 seonds + # TODO: Use trigger instead of time.sleep to control when data is sent time.sleep(3.0) - # Process and return result + # Process and return result if the data was loaded successfully data_bytes = data.tobytes() header = MessageHeader(subject=MessageSubject.BYTES, meta={'shape': data.shape, 'dtype': str(data.dtype)}) return BytesMessage(header=header, data=data_bytes) From 14366ea353827127bfaa942b7b670ce120119d01 Mon Sep 17 00:00:00 2001 From: Peter Ercius Date: Tue, 26 May 2026 17:31:18 -0700 Subject: [PATCH 3/8] remove unneeded numpy import --- operators/electron-count-save/Containerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operators/electron-count-save/Containerfile b/operators/electron-count-save/Containerfile index 00b33a4e..6ca6c846 100644 --- a/operators/electron-count-save/Containerfile +++ b/operators/electron-count-save/Containerfile @@ -1,4 +1,4 @@ -FROM interactem-operator +FROM distiller-streaming RUN poetry add stempy h5py From 08f45deb8431e74b8aff44c47a8ec5e4aeecc187 Mon Sep 17 00:00:00 2001 From: Peter Ercius Date: Tue, 26 May 2026 17:32:13 -0700 Subject: [PATCH 4/8] remove unneeded numpy import --- operators/read-tem-data/run.py | 1 - 1 file changed, 1 deletion(-) diff --git a/operators/read-tem-data/run.py b/operators/read-tem-data/run.py index 61099e4d..5692010a 100644 --- a/operators/read-tem-data/run.py +++ b/operators/read-tem-data/run.py @@ -3,7 +3,6 @@ from typing import Any import ncempy -import numpy as np from interactem.core.logger import get_logger from interactem.core.models.messages import BytesMessage, MessageHeader, MessageSubject From c9d717520d6a64c9df26dd807515f37e3bb2c502 Mon Sep 17 00:00:00 2001 From: Peter Ercius Date: Tue, 26 May 2026 17:39:16 -0700 Subject: [PATCH 5/8] put the sleep in a finally block to ensure it executes. Need to change this to a trigger though. --- operators/read-tem-data/run.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/operators/read-tem-data/run.py b/operators/read-tem-data/run.py index 5692010a..6442ba81 100644 --- a/operators/read-tem-data/run.py +++ b/operators/read-tem-data/run.py @@ -40,10 +40,9 @@ def read_data_ncempy( except Exception as e: logger.info(f"Problem loading file. Error: {e}") return None - - # Send the data every 3 seonds - # TODO: Use trigger instead of time.sleep to control when data is sent - time.sleep(3.0) + finally: + # TODO: Use trigger instead of time.sleep to control when data is sent + time.sleep(3.0) # wait to read it again. # Process and return result if the data was loaded successfully data_bytes = data.tobytes() From 51853c719507e1d4cca8fadd7792ac82fab022d8 Mon Sep 17 00:00:00 2001 From: Chris Harris Date: Wed, 27 May 2026 12:31:44 +0000 Subject: [PATCH 6/8] Update bake graph to inherit from distiller-streaming --- operators/docker-bake.hcl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operators/docker-bake.hcl b/operators/docker-bake.hcl index b669479a..f8936ac8 100644 --- a/operators/docker-bake.hcl +++ b/operators/docker-bake.hcl @@ -156,7 +156,7 @@ target "electron-count" { } target "electron-count-save" { - inherits = ["common", "operator-context", "output"] + inherits = ["common", "distiller-streaming-context", "output"] context = "operators/electron-count-save" dockerfile = "Containerfile" tags = ["${REGISTRY}/electron-count-save"] From a6b2f4fc783ec3b0d45ad7566840265aae95c885 Mon Sep 17 00:00:00 2001 From: Peter Ercius Date: Wed, 27 May 2026 14:10:40 -0700 Subject: [PATCH 7/8] use trigger instead of 3 second timer --- operators/read-tem-data/operator.json | 8 ++++++++ operators/read-tem-data/run.py | 19 +++++++++++-------- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/operators/read-tem-data/operator.json b/operators/read-tem-data/operator.json index d6d2dbc4..59bafc2e 100644 --- a/operators/read-tem-data/operator.json +++ b/operators/read-tem-data/operator.json @@ -29,6 +29,14 @@ "required": true } ], + "triggers": [ + { + "name": "read_now", + "label": "Read data now", + "description": "Manually read and emit the data without waiting.", + "mode": "single" + } + ], "parallel_config": { "type": "none" } diff --git a/operators/read-tem-data/run.py b/operators/read-tem-data/run.py index 6442ba81..b9e17eb2 100644 --- a/operators/read-tem-data/run.py +++ b/operators/read-tem-data/run.py @@ -15,11 +15,11 @@ @operator def read_data_ncempy( - inputs: BytesMessage | None, parameters: dict[str, Any] + inputs: BytesMessage | None, parameters: dict[str, Any], trigger=None ) -> BytesMessage | None: - """This reads data from disk and sends it on.""" - - # This operator does not require inputs + """Read and emit TEM data from a specified file when triggered.""" + if trigger is None: + return None # Extract parameters directory = parameters.get("raw_data_dir", "/test_data") @@ -40,11 +40,14 @@ def read_data_ncempy( except Exception as e: logger.info(f"Problem loading file. Error: {e}") return None - finally: - # TODO: Use trigger instead of time.sleep to control when data is sent - time.sleep(3.0) # wait to read it again. # Process and return result if the data was loaded successfully data_bytes = data.tobytes() - header = MessageHeader(subject=MessageSubject.BYTES, meta={'shape': data.shape, 'dtype': str(data.dtype)}) + header = MessageHeader( + subject=MessageSubject.BYTES, + meta={ + "shape": data.shape, + "dtype": str(data.dtype), + }, + ) return BytesMessage(header=header, data=data_bytes) From 990e5af545b3d838623927f1c6cec4fbfdef3403 Mon Sep 17 00:00:00 2001 From: Peter Ercius Date: Wed, 27 May 2026 14:12:22 -0700 Subject: [PATCH 8/8] remove time import --- operators/read-tem-data/run.py | 1 - 1 file changed, 1 deletion(-) diff --git a/operators/read-tem-data/run.py b/operators/read-tem-data/run.py index b9e17eb2..f829d45d 100644 --- a/operators/read-tem-data/run.py +++ b/operators/read-tem-data/run.py @@ -1,4 +1,3 @@ -import time from pathlib import Path from typing import Any