-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhelpers.py
More file actions
59 lines (44 loc) · 1.5 KB
/
Copy pathhelpers.py
File metadata and controls
59 lines (44 loc) · 1.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import os
from composio import Composio
from composio_openai_agents import OpenAIAgentsProvider
from dotenv import load_dotenv
load_dotenv()
_composio_clients: dict[str, Composio] = {}
def get_composio_user_id() -> str | None:
for key in ("COMPOSIO_USER_ID", "USER_ID"):
value = os.getenv(key)
if value:
return str(value)
return None
def get_composio_client() -> Composio | None:
api_key = os.getenv("COMPOSIO_API_KEY")
if not api_key:
return None
if api_key in _composio_clients:
return _composio_clients[api_key]
client = Composio(provider=OpenAIAgentsProvider())
_composio_clients[api_key] = client
return client
def execute_composio_tool(tool_name: str, arguments: dict):
composio = get_composio_client()
user_id = get_composio_user_id()
if not composio:
return {"error": "COMPOSIO_API_KEY is not set."}
if not user_id:
return {"error": "COMPOSIO_USER_ID is not set."}
return composio.tools.execute(
tool_name,
user_id=user_id,
arguments=arguments,
dangerously_skip_version_check=True,
)
def get_composio_tools(**kwargs):
composio = get_composio_client()
user_id = get_composio_user_id()
if not composio:
return {"error": "COMPOSIO_API_KEY is not set."}
if not user_id:
return {"error": "COMPOSIO_USER_ID is not set."}
return composio.tools.get(user_id, **kwargs)
user_id = get_composio_user_id()
composio = get_composio_client()