diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c6a62c..bb88450 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,4 +41,7 @@ ### 0.4.4 - removed unwanted 'continue' from koi-worker; added explicit cast for wait time ### 0.4.5 -- force a reconnect even of status variable indicates we are online \ No newline at end of file +- force a reconnect even of status variable indicates we are online +### 0.4.6 +- optionally skip the caching of new samples, usefull if we just want to send them and never consume or read back. +- added optional sleep time for instances that just changed, this prevents premature training of instances while someone else is writing to them# diff --git a/cpu-tf.Dockerfile b/cpu-tf.Dockerfile index 4bdb701..5da0ad3 100644 --- a/cpu-tf.Dockerfile +++ b/cpu-tf.Dockerfile @@ -3,7 +3,7 @@ # convenience, we added tensorflow-hub to the image, so you can use pretrained # models from TensorFlow Hub. -FROM tensorflow/tensorflow:2.11.0 +FROM tensorflow/tensorflow:2.13.0 COPY ./dist/ /wheels/ diff --git a/koi_core/api/instance.py b/koi_core/api/instance.py index b3b5ae4..64ff970 100644 --- a/koi_core/api/instance.py +++ b/koi_core/api/instance.py @@ -28,6 +28,7 @@ "finalized": "finalized", "could_train": "could_train", "last_modified": "last_modified", + "sample_last_modified": "sample_last_modified", } @@ -96,9 +97,6 @@ def get_instance_training_data(self, id: InstanceId, meta: CachingMeta): def set_instance_training_data(self, id: InstanceId, data: bytes): self.base._POST_raw(self.base._build_path(id) + "/training", data=data) - # endregion - - # region descriptor def get_descriptors(self, id: InstanceId, meta: CachingMeta = None): data, meta = self.base._GET(self.base._build_path(id) + "/descriptor") return ( diff --git a/koi_core/control/control.py b/koi_core/control/control.py index 91c8936..cbde4d4 100644 --- a/koi_core/control/control.py +++ b/koi_core/control/control.py @@ -60,6 +60,8 @@ def _set_instance(instance: Instance, max_instances: int = 1): def train(instance: Instance, batch_iterable=None, dev=False, max_instances=1): + global _runable_instance + if dev: temp_dir = TemporaryDirectory() model = instance.load_code(temp_dir.name) @@ -67,10 +69,16 @@ def train(instance: Instance, batch_iterable=None, dev=False, max_instances=1): temp_dir.cleanup() else: _set_instance(instance, max_instances) - _runable_instance.train(batch_iterable) + try: + _runable_instance.train(batch_iterable) + except: + kill_runable_instance() + raise def infer(instance: Instance, data, dev=False, model=None, max_instances=1) -> List[Any]: + global _runable_instance + if dev: temp_dir = None @@ -86,7 +94,27 @@ def infer(instance: Instance, data, dev=False, model=None, max_instances=1) -> L else: _set_instance(instance, max_instances) - return _runable_instance.infer(data) + try: + return _runable_instance.infer(data) + except: + kill_runable_instance() + raise + + +def kill_runable_instance(): + global _runable_instance + global _active_instances + + if _runable_instance is None: + return + + instance = _runable_instance.instance + + _runable_instance.terminate() + _runable_instance = None + + if instance in _active_instances: + del _active_instances[instance] def terminate(): diff --git a/koi_core/data/simple_accessor.py b/koi_core/data/simple_accessor.py index 8568bd7..4a53e5d 100644 --- a/koi_core/data/simple_accessor.py +++ b/koi_core/data/simple_accessor.py @@ -23,12 +23,12 @@ @cache def get_np(self: Union[SampleDatum, SampleLabel], meta) -> np.ndarray: f = BytesIO(self.raw) - return np.load(f), meta + return np.load(f, allow_pickle=False), meta def set_np(self: Union[SampleDatum, SampleLabel], value: np.ndarray) -> None: f = BytesIO() - np.save(f, value) + np.save(f, value, allow_pickle=False) self.raw = f.getvalue() setCache(self, 'np', None) diff --git a/koi_core/resources/instance.py b/koi_core/resources/instance.py index ea1d140..8461bbf 100644 --- a/koi_core/resources/instance.py +++ b/koi_core/resources/instance.py @@ -13,7 +13,6 @@ # GNU Lesser General Public License is distributed along with this # software and can be found at http://www.gnu.org/licenses/lgpl.html -from datetime import datetime from koi_core.caching import cache, offlineFeature from koi_core.resources.model import Model from koi_core.resources.ids import InstanceId, ModelId, SampleId, DescriptorId @@ -183,7 +182,8 @@ class InstanceBasicFields: description: str finalized: bool could_train: bool - last_modified: datetime + last_modified: str + sample_last_modified: str class InstanceProxy(Instance): @@ -285,8 +285,8 @@ def __init__(self, pool: "APIObjectPool", id: Union[InstanceId, ModelId]) -> Non self.descriptors = InstanceDescriptorAccessor(self) self.parameter = InstanceParameterAccessor(self) - def new_sample(self): - sample = self.pool.new_sample(self.id) + def new_sample(self, remote_only:bool=False): + sample = self.pool.new_sample(self.id, not remote_only) return sample def merge(self, instances: Iterable): diff --git a/koi_core/resources/pool.py b/koi_core/resources/pool.py index 9c4aff9..c3bd6a0 100644 --- a/koi_core/resources/pool.py +++ b/koi_core/resources/pool.py @@ -154,7 +154,8 @@ def new_instance(self, id: ModelId) -> Instance: def sample(self, id: SampleId, meta) -> Sample: return SampleProxy(self, id), meta - def new_sample(self, id: InstanceId) -> Sample: + def new_sample(self, id: InstanceId, cached:bool=True) -> Sample: sample = SampleProxy(self, id) - setIndexedCache(self, "sample", sample.id, sample) + if cached: + setIndexedCache(self, "sample", sample.id, sample) return sample diff --git a/koi_core/worker.py b/koi_core/worker.py index 269cb25..75f2f28 100644 --- a/koi_core/worker.py +++ b/koi_core/worker.py @@ -23,6 +23,8 @@ import koi_core as koi from koi_core.exceptions import KoiApiOfflineException from time import sleep +from datetime import datetime + signal_interrupt = False @@ -114,6 +116,13 @@ def main(): default=60, help="Number of seconds to wait before a retry", ) + p.add( + "-w", + "--wait-training", + type=int, + default=10, + help="Number of seconds a instance has to be untouched before atempting to train it", + ) opt = p.parse_args() @@ -193,9 +202,12 @@ def main(): instance.name, ) continue - + + # seconds since last instance update + last_modified = datetime.fromisoformat(instance.sample_last_modified) + sec_since_last_change = (datetime.utcnow() - last_modified).total_seconds() # train the instance if its ready to train or the user forces it - if opt.force or instance.could_train: + if opt.force or (instance.could_train and sec_since_last_change > opt.wait_training): try: logging.info( "start to train instance %s/%s", model.name, instance.name @@ -207,6 +219,13 @@ def main(): logging.exception( "instance %s/%s had an exception", model.name, instance.name ) + else: + if (instance.could_train and sec_since_last_change < opt.wait_training): + logging.info( + "skipping instance %s/%s, as data was not present or still changing.", + model.name, + instance.name + ) # break here if the user selected to run once if opt.once: @@ -225,14 +244,17 @@ def main(): if retries == 0: # if the retry counter is initilizes with a negative value, we will try forever logging.error("koi api is offline, giving up") - raise ex + sys.exit(-1) retries -= 1 logging.error("koi api is offline, retrying in %d seconds", opt.sleep_retry) sleep((float)(opt.sleep_retry)) # set the online flag and try to athenticate - pool.api.reconnect() + try: + pool.api.reconnect() + except: + logging.error("could not reconnect to koi api") logging.info("stopped") sys.exit(0) diff --git a/pyproject.toml b/pyproject.toml index 0f2a396..7c8bdaf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,12 +2,21 @@ requires = ["setuptools>=61.0.0"] build-backend = "setuptools.build_meta" +[tools.setuptools] +packages = [ + "koi_core", + "koi_core.api", + "koi_core.control", + "koi_core.data", + "koi_core.resources" +] + [project] name = "koi-core" -version = "0.4.5" +version = "0.4.6" description = "Runtime for the KOI-System including koi-worker" readme = "README.md" -requires-python = ">=3.8,<3.11" +requires-python = ">=3.8,<3.12" classifiers = ["Programming Language :: Python :: 3", "Operating System :: OS Independent", "License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)"] dependencies = [