-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbasic_clockwork.py
More file actions
40 lines (30 loc) · 1.03 KB
/
Copy pathbasic_clockwork.py
File metadata and controls
40 lines (30 loc) · 1.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/usr/bin/env python3
"""Basic MetaFlow Clockwork example using the current public API."""
from metaflow_clockwork import ClockworkEngine, MetaTag, MetaTagType, RecordingEventSink
def main() -> None:
sink = RecordingEventSink()
engine = ClockworkEngine(event_sink=sink, run_id="basic-clockwork", request_id="example-request")
root = MetaTag(
tag_id="root-gear",
tag_type=MetaTagType.GEAR,
data={"label": "basic-clockwork"},
event_sink=sink,
)
def spawn_once(tag: MetaTag):
if tag.data.get("spawned"):
return []
tag.data["spawned"] = True
child = tag.spawn_child(
MetaTagType.COG,
tag_id="child-cog",
data={"label": "worker"},
)
return [child] if child else []
root.add_function("spawn_once", spawn_once)
engine.add_root_gear(root)
for _ in range(2):
summary = engine.tick()
print(summary)
print([event.kind for event in sink.events])
if __name__ == "__main__":
main()