diff --git a/operators/docker-bake.hcl b/operators/docker-bake.hcl index b669479a..f8936ac8 100644 --- a/operators/docker-bake.hcl +++ b/operators/docker-bake.hcl @@ -156,7 +156,7 @@ target "electron-count" { } target "electron-count-save" { - inherits = ["common", "operator-context", "output"] + inherits = ["common", "distiller-streaming-context", "output"] context = "operators/electron-count-save" dockerfile = "Containerfile" tags = ["${REGISTRY}/electron-count-save"] diff --git a/operators/electron-count-save/Containerfile b/operators/electron-count-save/Containerfile index 00b33a4e..6ca6c846 100644 --- a/operators/electron-count-save/Containerfile +++ b/operators/electron-count-save/Containerfile @@ -1,4 +1,4 @@ -FROM interactem-operator +FROM distiller-streaming RUN poetry add stempy h5py diff --git a/operators/read-tem-data/operator.json b/operators/read-tem-data/operator.json index d6d2dbc4..59bafc2e 100644 --- a/operators/read-tem-data/operator.json +++ b/operators/read-tem-data/operator.json @@ -29,6 +29,14 @@ "required": true } ], + "triggers": [ + { + "name": "read_now", + "label": "Read data now", + "description": "Manually read and emit the data without waiting.", + "mode": "single" + } + ], "parallel_config": { "type": "none" } diff --git a/operators/read-tem-data/run.py b/operators/read-tem-data/run.py index 2778a9a9..f829d45d 100644 --- a/operators/read-tem-data/run.py +++ b/operators/read-tem-data/run.py @@ -1,9 +1,7 @@ -import time from pathlib import Path from typing import Any import ncempy -import numpy as np from interactem.core.logger import get_logger from interactem.core.models.messages import BytesMessage, MessageHeader, MessageSubject @@ -15,36 +13,40 @@ data_dir = Path(f"{DATA_DIRECTORY}/raw_data_dir") @operator -def read_data_pae( - inputs: BytesMessage | None, parameters: dict[str, Any] +def read_data_ncempy( + inputs: BytesMessage | None, parameters: dict[str, Any], trigger=None ) -> BytesMessage | None: - """This reads data from disk and sends it on.""" - - # This operator does not require inputs + """Read and emit TEM data from a specified file when triggered.""" + if trigger is None: + return None # Extract parameters - directory = parameters.get("directory", "/test_data") + directory = parameters.get("raw_data_dir", "/test_data") file = parameters.get("file", "test.emd") - # mount_dir = parameters.get("mount_dir", "~") - # TODO: Implement operator logic here + # Read data from disk logger.info("read_tem_data operator running...") - logger.info(f'directory: {directory}') - logger.info(f'mount directory: {data_dir}') - logger.info(f'file: {file}') - # file_path = Path(directory) / Path(file) + logger.info(f"parameter directory: {directory}") + logger.info(f"internal mount directory: {data_dir}") + logger.info(f"file name: {file}") + file_path = data_dir / Path(file) try: dd = ncempy.read(file_path) data = dd['data'] logger.info(f'file data shape: {data.shape}') - except Exception: - data = np.zeros((100, 100), dtype=np.uint8) - logger.info('Problem loading file. Using zeros array.') - time.sleep(3.0) + except Exception as e: + logger.info(f"Problem loading file. Error: {e}") + return None - # TODO: Process and return result + # Process and return result if the data was loaded successfully data_bytes = data.tobytes() - header = MessageHeader(subject=MessageSubject.BYTES, meta={'shape': data.shape, 'dtype': str(data.dtype)}) + header = MessageHeader( + subject=MessageSubject.BYTES, + meta={ + "shape": data.shape, + "dtype": str(data.dtype), + }, + ) return BytesMessage(header=header, data=data_bytes)