From a73b695f7c57475c82a6405d90f9b635b2c8bc6c Mon Sep 17 00:00:00 2001 From: Johannes Richter Date: Wed, 21 Jun 2023 20:00:42 +0200 Subject: [PATCH 01/14] add the option to not cache a new sample in our object pool; --- koi_core/resources/instance.py | 4 ++-- koi_core/resources/pool.py | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/koi_core/resources/instance.py b/koi_core/resources/instance.py index ea1d140..b8c2241 100644 --- a/koi_core/resources/instance.py +++ b/koi_core/resources/instance.py @@ -285,8 +285,8 @@ def __init__(self, pool: "APIObjectPool", id: Union[InstanceId, ModelId]) -> Non self.descriptors = InstanceDescriptorAccessor(self) self.parameter = InstanceParameterAccessor(self) - def new_sample(self): - sample = self.pool.new_sample(self.id) + def new_sample(self, remote_only:bool=False): + sample = self.pool.new_sample(self.id, not remote_only) return sample def merge(self, instances: Iterable): diff --git a/koi_core/resources/pool.py b/koi_core/resources/pool.py index 9c4aff9..c3bd6a0 100644 --- a/koi_core/resources/pool.py +++ b/koi_core/resources/pool.py @@ -154,7 +154,8 @@ def new_instance(self, id: ModelId) -> Instance: def sample(self, id: SampleId, meta) -> Sample: return SampleProxy(self, id), meta - def new_sample(self, id: InstanceId) -> Sample: + def new_sample(self, id: InstanceId, cached:bool=True) -> Sample: sample = SampleProxy(self, id) - setIndexedCache(self, "sample", sample.id, sample) + if cached: + setIndexedCache(self, "sample", sample.id, sample) return sample From 1d0fc7baca9e2498a02ed69d3d9f3bedb514d1a7 Mon Sep 17 00:00:00 2001 From: Johannes Richter Date: Wed, 21 Jun 2023 20:11:14 +0200 Subject: [PATCH 02/14] add an optional sleeptime for instance that just changed; --- koi_core/worker.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/koi_core/worker.py b/koi_core/worker.py index 269cb25..5e4f8bb 100644 --- a/koi_core/worker.py +++ b/koi_core/worker.py @@ -23,6 +23,8 @@ import koi_core as koi from koi_core.exceptions import KoiApiOfflineException from time import sleep +from datetime import datetime + signal_interrupt = False @@ -114,6 +116,13 @@ def main(): default=60, help="Number of seconds to wait before a retry", ) + p.add( + "-w", + "--wait-training", + type=int, + default=10, + help="Number of seconds a instance has to be untouched before atempting to train it", + ) opt = p.parse_args() @@ -193,9 +202,11 @@ def main(): instance.name, ) continue - + + # seconds since last instance update + sec_since_last_change = (datetime.utcnow() - instance.last_modified).total_seconds() # train the instance if its ready to train or the user forces it - if opt.force or instance.could_train: + if opt.force or (instance.could_train and sec_since_last_change > opt.wait_training): try: logging.info( "start to train instance %s/%s", model.name, instance.name From 5f8ff2e3e3c757eed510d39cafb54f93d7fff3e7 Mon Sep 17 00:00:00 2001 From: Johannes Richter Date: Wed, 21 Jun 2023 20:12:15 +0200 Subject: [PATCH 03/14] bump the version and edit the changelog; --- CHANGELOG.md | 5 ++++- pyproject.toml | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c6a62c..bb88450 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,4 +41,7 @@ ### 0.4.4 - removed unwanted 'continue' from koi-worker; added explicit cast for wait time ### 0.4.5 -- force a reconnect even of status variable indicates we are online \ No newline at end of file +- force a reconnect even of status variable indicates we are online +### 0.4.6 +- optionally skip the caching of new samples, usefull if we just want to send them and never consume or read back. +- added optional sleep time for instances that just changed, this prevents premature training of instances while someone else is writing to them# diff --git a/pyproject.toml b/pyproject.toml index 0f2a396..2e26c2b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "koi-core" -version = "0.4.5" +version = "0.4.6" description = "Runtime for the KOI-System including koi-worker" readme = "README.md" requires-python = ">=3.8,<3.11" From c6f38989d0add774a17552cf0bc603cecbb2046f Mon Sep 17 00:00:00 2001 From: Johannes Richter Date: Thu, 22 Jun 2023 20:57:23 +0200 Subject: [PATCH 04/14] parse the last_modified field of the instance to a datetime object; remove the wrong type hint to datetime, as the field is an uninterpreted string; --- koi_core/resources/instance.py | 3 +-- koi_core/worker.py | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/koi_core/resources/instance.py b/koi_core/resources/instance.py index b8c2241..4d7a099 100644 --- a/koi_core/resources/instance.py +++ b/koi_core/resources/instance.py @@ -13,7 +13,6 @@ # GNU Lesser General Public License is distributed along with this # software and can be found at http://www.gnu.org/licenses/lgpl.html -from datetime import datetime from koi_core.caching import cache, offlineFeature from koi_core.resources.model import Model from koi_core.resources.ids import InstanceId, ModelId, SampleId, DescriptorId @@ -183,7 +182,7 @@ class InstanceBasicFields: description: str finalized: bool could_train: bool - last_modified: datetime + last_modified: str class InstanceProxy(Instance): diff --git a/koi_core/worker.py b/koi_core/worker.py index 5e4f8bb..3562881 100644 --- a/koi_core/worker.py +++ b/koi_core/worker.py @@ -204,7 +204,8 @@ def main(): continue # seconds since last instance update - sec_since_last_change = (datetime.utcnow() - instance.last_modified).total_seconds() + last_modified = datetime.fromisoformat(instance.last_modified) + sec_since_last_change = (datetime.utcnow() - last_modified).total_seconds() # train the instance if its ready to train or the user forces it if opt.force or (instance.could_train and sec_since_last_change > opt.wait_training): try: From 6e8fc117ddfb0f604f9086c4b4edbdeb619c2e51 Mon Sep 17 00:00:00 2001 From: Johannes Richter Date: Mon, 26 Jun 2023 21:38:56 +0200 Subject: [PATCH 05/14] do not use pickle to ensure interop between different python versions; --- koi_core/data/simple_accessor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/koi_core/data/simple_accessor.py b/koi_core/data/simple_accessor.py index 8568bd7..4a53e5d 100644 --- a/koi_core/data/simple_accessor.py +++ b/koi_core/data/simple_accessor.py @@ -23,12 +23,12 @@ @cache def get_np(self: Union[SampleDatum, SampleLabel], meta) -> np.ndarray: f = BytesIO(self.raw) - return np.load(f), meta + return np.load(f, allow_pickle=False), meta def set_np(self: Union[SampleDatum, SampleLabel], value: np.ndarray) -> None: f = BytesIO() - np.save(f, value) + np.save(f, value, allow_pickle=False) self.raw = f.getvalue() setCache(self, 'np', None) From 8c072e8a4d5db05832fe5babad03efcf665cf0c2 Mon Sep 17 00:00:00 2001 From: Johannes Richter Date: Mon, 26 Jun 2023 21:40:13 +0200 Subject: [PATCH 06/14] add samples_last_modified to instance; use it in worker; --- koi_core/api/instance.py | 4 +--- koi_core/worker.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/koi_core/api/instance.py b/koi_core/api/instance.py index b3b5ae4..e8a722c 100644 --- a/koi_core/api/instance.py +++ b/koi_core/api/instance.py @@ -28,6 +28,7 @@ "finalized": "finalized", "could_train": "could_train", "last_modified": "last_modified", + "samples_last_modified": "samples_last_modified", } @@ -96,9 +97,6 @@ def get_instance_training_data(self, id: InstanceId, meta: CachingMeta): def set_instance_training_data(self, id: InstanceId, data: bytes): self.base._POST_raw(self.base._build_path(id) + "/training", data=data) - # endregion - - # region descriptor def get_descriptors(self, id: InstanceId, meta: CachingMeta = None): data, meta = self.base._GET(self.base._build_path(id) + "/descriptor") return ( diff --git a/koi_core/worker.py b/koi_core/worker.py index 3562881..52d367f 100644 --- a/koi_core/worker.py +++ b/koi_core/worker.py @@ -204,7 +204,7 @@ def main(): continue # seconds since last instance update - last_modified = datetime.fromisoformat(instance.last_modified) + last_modified = datetime.fromisoformat(instance.samples_last_modified) sec_since_last_change = (datetime.utcnow() - last_modified).total_seconds() # train the instance if its ready to train or the user forces it if opt.force or (instance.could_train and sec_since_last_change > opt.wait_training): From 3eff1e5f90c7b0059349e3227a02edd9534e5ce1 Mon Sep 17 00:00:00 2001 From: Johannes Richter Date: Fri, 22 Sep 2023 21:12:48 +0200 Subject: [PATCH 07/14] add missing field; fix the wrong field name for sample_last_mdofied; --- koi_core/api/instance.py | 2 +- koi_core/resources/instance.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/koi_core/api/instance.py b/koi_core/api/instance.py index e8a722c..64ff970 100644 --- a/koi_core/api/instance.py +++ b/koi_core/api/instance.py @@ -28,7 +28,7 @@ "finalized": "finalized", "could_train": "could_train", "last_modified": "last_modified", - "samples_last_modified": "samples_last_modified", + "sample_last_modified": "sample_last_modified", } diff --git a/koi_core/resources/instance.py b/koi_core/resources/instance.py index 4d7a099..8461bbf 100644 --- a/koi_core/resources/instance.py +++ b/koi_core/resources/instance.py @@ -183,6 +183,7 @@ class InstanceBasicFields: finalized: bool could_train: bool last_modified: str + sample_last_modified: str class InstanceProxy(Instance): From 82cec725419aa0c2107567045e9a78ac8036edfe Mon Sep 17 00:00:00 2001 From: Johannes Richter Date: Fri, 22 Sep 2023 21:33:00 +0200 Subject: [PATCH 08/14] update the worker to use the correct field name; add info for when we skip because of still changing data sets; --- koi_core/worker.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/koi_core/worker.py b/koi_core/worker.py index 52d367f..f2fdd63 100644 --- a/koi_core/worker.py +++ b/koi_core/worker.py @@ -204,7 +204,7 @@ def main(): continue # seconds since last instance update - last_modified = datetime.fromisoformat(instance.samples_last_modified) + last_modified = datetime.fromisoformat(instance.sample_last_modified) sec_since_last_change = (datetime.utcnow() - last_modified).total_seconds() # train the instance if its ready to train or the user forces it if opt.force or (instance.could_train and sec_since_last_change > opt.wait_training): @@ -219,6 +219,13 @@ def main(): logging.exception( "instance %s/%s had an exception", model.name, instance.name ) + else: + if (instance.could_train and sec_since_last_change < opt.wait_training): + logging.info( + "skipping instance %s/%s, as data was not present or still changing.", + model.name, + instance.name + ) # break here if the user selected to run once if opt.once: From 1fc20fc7c78f48d5d439aac2a38e20f913fbdf94 Mon Sep 17 00:00:00 2001 From: Johannes Richter Date: Fri, 29 Sep 2023 20:02:06 +0200 Subject: [PATCH 09/14] kill an instance if it ever got an exception, this will force an reset of its associated runnable_instance; --- koi_core/control/control.py | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/koi_core/control/control.py b/koi_core/control/control.py index 91c8936..cbde4d4 100644 --- a/koi_core/control/control.py +++ b/koi_core/control/control.py @@ -60,6 +60,8 @@ def _set_instance(instance: Instance, max_instances: int = 1): def train(instance: Instance, batch_iterable=None, dev=False, max_instances=1): + global _runable_instance + if dev: temp_dir = TemporaryDirectory() model = instance.load_code(temp_dir.name) @@ -67,10 +69,16 @@ def train(instance: Instance, batch_iterable=None, dev=False, max_instances=1): temp_dir.cleanup() else: _set_instance(instance, max_instances) - _runable_instance.train(batch_iterable) + try: + _runable_instance.train(batch_iterable) + except: + kill_runable_instance() + raise def infer(instance: Instance, data, dev=False, model=None, max_instances=1) -> List[Any]: + global _runable_instance + if dev: temp_dir = None @@ -86,7 +94,27 @@ def infer(instance: Instance, data, dev=False, model=None, max_instances=1) -> L else: _set_instance(instance, max_instances) - return _runable_instance.infer(data) + try: + return _runable_instance.infer(data) + except: + kill_runable_instance() + raise + + +def kill_runable_instance(): + global _runable_instance + global _active_instances + + if _runable_instance is None: + return + + instance = _runable_instance.instance + + _runable_instance.terminate() + _runable_instance = None + + if instance in _active_instances: + del _active_instances[instance] def terminate(): From a62bbdf9db2442a5ae80439f1d23b99af8ba6480 Mon Sep 17 00:00:00 2001 From: Johannes Richter Date: Fri, 29 Sep 2023 20:02:29 +0200 Subject: [PATCH 10/14] catch if the reconnect has an exception; --- koi_core/worker.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/koi_core/worker.py b/koi_core/worker.py index f2fdd63..c8cec5e 100644 --- a/koi_core/worker.py +++ b/koi_core/worker.py @@ -251,7 +251,10 @@ def main(): sleep((float)(opt.sleep_retry)) # set the online flag and try to athenticate - pool.api.reconnect() + try: + pool.api.reconnect() + except: + logging.error("could not reconnect to koi api") logging.info("stopped") sys.exit(0) From ecc822d82e6637091b30c581af3160eda79b18ed Mon Sep 17 00:00:00 2001 From: Johannes Richter Date: Fri, 29 Sep 2023 20:05:06 +0200 Subject: [PATCH 11/14] kill the worker with exit code -1; --- koi_core/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/koi_core/worker.py b/koi_core/worker.py index c8cec5e..75f2f28 100644 --- a/koi_core/worker.py +++ b/koi_core/worker.py @@ -244,7 +244,7 @@ def main(): if retries == 0: # if the retry counter is initilizes with a negative value, we will try forever logging.error("koi api is offline, giving up") - raise ex + sys.exit(-1) retries -= 1 logging.error("koi api is offline, retrying in %d seconds", opt.sleep_retry) From f52246a403da8c97c3789d63dee59a972fd37f81 Mon Sep 17 00:00:00 2001 From: Johannes Richter Date: Fri, 29 Sep 2023 21:31:56 +0200 Subject: [PATCH 12/14] upgrade dockerfile to tensorflow 2.13; --- cpu-tf.Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpu-tf.Dockerfile b/cpu-tf.Dockerfile index 4bdb701..5da0ad3 100644 --- a/cpu-tf.Dockerfile +++ b/cpu-tf.Dockerfile @@ -3,7 +3,7 @@ # convenience, we added tensorflow-hub to the image, so you can use pretrained # models from TensorFlow Hub. -FROM tensorflow/tensorflow:2.11.0 +FROM tensorflow/tensorflow:2.13.0 COPY ./dist/ /wheels/ From bb77000db7001aaa92cf305f27caa800024e85ed Mon Sep 17 00:00:00 2001 From: Johannes Richter Date: Sat, 30 Sep 2023 12:31:50 +0200 Subject: [PATCH 13/14] allow the use of 3.11; --- pyproject.toml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 2e26c2b..5c95daf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,12 +2,15 @@ requires = ["setuptools>=61.0.0"] build-backend = "setuptools.build_meta" +[tools.setuptools] +packages = ["koi_core"] + [project] name = "koi-core" version = "0.4.6" description = "Runtime for the KOI-System including koi-worker" readme = "README.md" -requires-python = ">=3.8,<3.11" +requires-python = ">=3.8,<3.12" classifiers = ["Programming Language :: Python :: 3", "Operating System :: OS Independent", "License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)"] dependencies = [ From 9beece9f3a2c95d5c1d489f02f2601ce9dac0324 Mon Sep 17 00:00:00 2001 From: Johannes Richter Date: Sat, 30 Sep 2023 14:03:07 +0200 Subject: [PATCH 14/14] explicitly include all submodules; --- pyproject.toml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 5c95daf..7c8bdaf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,13 @@ requires = ["setuptools>=61.0.0"] build-backend = "setuptools.build_meta" [tools.setuptools] -packages = ["koi_core"] +packages = [ + "koi_core", + "koi_core.api", + "koi_core.control", + "koi_core.data", + "koi_core.resources" +] [project] name = "koi-core"