-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbasic_autonomous_agent.py
More file actions
120 lines (89 loc) · 3.77 KB
/
Copy pathbasic_autonomous_agent.py
File metadata and controls
120 lines (89 loc) · 3.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env python
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
"""
This module provides an NPC agent to control the ego vehicle
"""
from __future__ import print_function
import os
import carla
from behavior_agent import BehaviorAgent
from srunner.scenariomanager.carla_data_provider import CarlaDataProvider
import importlib
from leaderboard.autoagents.autonomous_agent import AutonomousAgent, Track
import json
from utils import Streamer
def get_entry_point():
return 'MyTeamAgent'
class MyTeamAgent(AutonomousAgent):
"""
NPC autonomous agent to control the ego vehicle
"""
_agent = None
_route_assigned = False
def setup(self, path_to_conf_file):
"""
Setup the agent parameters
"""
self.track = Track.SENSORS
self._agent = None
with open(path_to_conf_file, "r") as f:
self.configs = json.load(f)
f.close()
self.__show = len(self.configs["Visualizer_IP"]) > 0
if self.__show:
self.showServer = Streamer(self.configs["Visualizer_IP"])
def sensors(self):
"""
Define the sensor suite required by the agent
:return: a list containing the required sensors in the following format:
[
{'type': 'sensor.camera.rgb', 'x': 0.7, 'y': -0.4, 'z': 1.60, 'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0,
'width': 300, 'height': 200, 'fov': 100, 'id': 'Left'},
{'type': 'sensor.camera.rgb', 'x': 0.7, 'y': 0.4, 'z': 1.60, 'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0,
'width': 300, 'height': 200, 'fov': 100, 'id': 'Right'},
{'type': 'sensor.lidar.ray_cast', 'x': 0.7, 'y': 0.0, 'z': 1.60, 'yaw': 0.0, 'pitch': 0.0, 'roll': 0.0,
'id': 'LIDAR'}
]
"""
return self.configs["sensors"]
def set_global_plan(self, global_plan_gps, global_plan_world_coord):
"""
Set the plan (route) for the agent
"""
self._global_plan_world_coord = global_plan_world_coord
self._global_plan = global_plan_gps
def run_step(self, input_data, timestamp):
"""
Execute one step of navigation.
"""
if not self._agent:
# Search for the ego actor
hero_actor = None
for actor in CarlaDataProvider.get_world().get_actors():
if 'role_name' in actor.attributes and actor.attributes['role_name'] == 'hero':
hero_actor = actor
if not hero_actor:
return carla.VehicleControl()
self._agent = BehaviorAgent(hero_actor, opt_dict=self.configs)
plan = [(CarlaDataProvider.get_map().get_waypoint(x[0].location),x[1]) for x in self._global_plan_world_coord]
self._agent.set_global_plan(plan)
return carla.VehicleControl()
else:
controls = self._agent.run_step()
if self.__show:
self.showServer.send_frame("RGB", input_data["Center"][1])
self.showServer.send_data("Controls",{
"steer":controls.steer,
"throttle":controls.throttle,
"brake": controls.brake,
})
if len(self.configs["SaveSpeedData"]) > 0:
with open(os.path.join(os.getcwd(), self.configs["SaveSpeedData"]), "a") as fp:
fp.write(str(timestamp)+";"+str(input_data["Speed"][1]["speed"] * 3.6)+";"+str(self.configs["target_speed"])+"\n")
fp.close()
return controls
def destroy(self):
print("DESTROY")
if self._agent:
self._agent.reset()