-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample.py
More file actions
174 lines (141 loc) Β· 6.05 KB
/
Copy pathexample.py
File metadata and controls
174 lines (141 loc) Β· 6.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#!/usr/bin/env python3
"""
Example usage of the Claw Bot Swarm Communication System
This example demonstrates:
1. Creating a leader bot (Conductor)
2. Adding follower bots to the swarm
3. Assigning tasks to bots
4. Monitoring swarm status
5. Dynamically adding more bots as needed
"""
from leader_bot import LeaderBot
from follower_bot import FollowerBot
from swarm_coordinator import SwarmCoordinator
import time
def print_separator():
print("\n" + "="*60 + "\n")
def main():
print("π€ Claw Bot Swarm Communication System Demo")
print_separator()
# Step 1: Create the swarm coordinator
print("Step 1: Initializing Swarm Coordinator...")
coordinator = SwarmCoordinator()
print_separator()
# Step 2: Create and set the leader bot
print("Step 2: Creating Leader Bot (Conductor)...")
leader = LeaderBot(name="Conductor-Alpha")
coordinator.set_leader(leader)
print_separator()
# Step 3: Add initial follower bots
print("Step 3: Adding Initial Follower Bots...")
follower1 = FollowerBot(leader.bot_id, name="ClawBot-1")
follower2 = FollowerBot(leader.bot_id, name="ClawBot-2")
coordinator.add_follower(follower1)
coordinator.add_follower(follower2)
print_separator()
# Step 4: Check initial swarm status
print("Step 4: Initial Swarm Status")
status = coordinator.get_swarm_status()
print(f"Leader: {status['leader']['name']}")
print(f"Total Followers: {status['total_followers']}")
print(f"Idle Followers: {status['idle_followers']}")
print(f"Busy Followers: {status['busy_followers']}")
print("\nFollowers:")
for follower in status['followers']:
print(f" - {follower['name']}: {follower['status']}")
print_separator()
# Step 5: Assign tasks to specific bots
print("Step 5: Assigning Specific Tasks...")
task1 = {
'description': 'Pick up object at position (10, 20)',
'action': 'pickup',
'position': (10, 20),
'duration': 0.5 # Simulated task duration
}
task2 = {
'description': 'Move to position (30, 40)',
'action': 'move',
'position': (30, 40),
'duration': 0.3
}
coordinator.assign_task_from_leader(follower1.bot_id, task1)
time.sleep(0.1) # Small delay for demonstration
coordinator.assign_task_from_leader(follower2.bot_id, task2)
# Wait for tasks to complete
time.sleep(1)
# Report completions
coordinator.report_task_complete(follower1.bot_id, task1)
coordinator.report_task_complete(follower2.bot_id, task2)
print_separator()
# Step 6: Broadcast a task to any available bot
print("Step 6: Broadcasting Task to Available Bots...")
task3 = {
'description': 'Scan area for objects',
'action': 'scan',
'area': 'zone-A',
'duration': 0.4
}
coordinator.broadcast_task_from_leader(task3)
time.sleep(0.5)
coordinator.report_task_complete(follower1.bot_id, task3)
print_separator()
# Step 7: Add more bots dynamically
print("Step 7: Dynamically Adding More Bots...")
follower3 = FollowerBot(leader.bot_id, name="ClawBot-3")
follower4 = FollowerBot(leader.bot_id, name="ClawBot-4")
coordinator.add_follower(follower3)
coordinator.add_follower(follower4)
print_separator()
# Step 8: Queue multiple tasks and process them
print("Step 8: Queuing Multiple Tasks...")
tasks = [
{'description': 'Sort objects in bin 1', 'action': 'sort', 'duration': 0.3},
{'description': 'Transport items to zone B', 'action': 'transport', 'duration': 0.3},
{'description': 'Clean workspace area', 'action': 'clean', 'duration': 0.3},
{'description': 'Return to home position', 'action': 'return_home', 'duration': 0.2},
]
for task in tasks:
leader.add_task_to_queue(task)
print("\nProcessing queued tasks...")
coordinator.process_queued_tasks()
# Wait for some tasks to complete
time.sleep(1)
# Simulate task completions for busy followers
# Only report completion for followers that were actually assigned tasks
busy_followers = [(fid, f) for fid, f in coordinator.followers.items()
if f.get_status()['status'] == 'idle' and fid in [fid for fid in coordinator.followers.keys()]]
# In reality, we track which tasks were assigned
# For this demo, we match completed tasks to the number that were actually assigned
num_assigned = len(coordinator.leader.completed_tasks)
for i in range(min(len(tasks), len(coordinator.followers))):
follower_id = list(coordinator.followers.keys())[i]
coordinator.report_task_complete(follower_id, tasks[i])
print_separator()
# Step 9: Final swarm status
print("Step 9: Final Swarm Status")
final_status = coordinator.get_swarm_status()
print(f"Leader: {final_status['leader']['name']}")
print(f"Total Followers: {final_status['total_followers']}")
print(f"Idle Followers: {final_status['idle_followers']}")
print(f"Busy Followers: {final_status['busy_followers']}")
print(f"Completed Tasks: {final_status['completed_tasks']}")
print("\nAll Followers:")
for follower in final_status['followers']:
print(f" - {follower['name']}: {follower['status']}")
print_separator()
# Step 10: Remove a bot from the swarm
print("Step 10: Removing a Bot from the Swarm...")
coordinator.remove_follower(follower4.bot_id)
status_after_removal = coordinator.get_swarm_status()
print(f"Remaining Followers: {status_after_removal['total_followers']}")
print_separator()
print("β
Demo Complete!")
print("\nKey Features Demonstrated:")
print(" β Leader bot (Conductor) managing the swarm")
print(" β Multiple follower bots executing tasks")
print(" β Task assignment (specific and broadcast)")
print(" β Dynamic bot addition and removal")
print(" β Task queuing and processing")
print(" β Status monitoring and reporting")
if __name__ == "__main__":
main()