Create Inactive Mailbox Status Using Heartbeats#404
Conversation
| from academy.identifier import AgentT | ||
|
|
||
| # Mailbox is inactive after no recorded activity for 2 minutes. | ||
| HEARTBEAT_STALE_THRESHOLD: float = 120 |
There was a problem hiding this comment.
Can we make this configurable (i.e. a parameter to most of the exchange factories. For the HttpExchangeFactory and the GlobusExchangeFactory I don't think it will work, I think we'll have to put it in exchange/cloud/config.py).
I also think this is probably better expressed as a number of heartbeats --- that leaves less room for configuration errors like "I set my heartbeat at 60s and my threshold at 30s".
|
Close #366 |
934ce84 to
a047e2c
Compare
| host=host, | ||
| port=port, | ||
| logger=LogConfig(level=level), | ||
| heartbeat_stale_threshold_s=heartbeat_stale_threshold_s, |
There was a problem hiding this comment.
Is this used anywhere? If we want it to be configurable per client, we could have it be passed as a parameter to the status endpoint of the app?
There was a problem hiding this comment.
Fixed in latest commit - now a parameter in the status end point, but also kept config's connection as a fallback
There was a problem hiding this comment.
Regarding the comment about why its used - i think this would be helpful if someone wanted to change the threshold globally via the config, which then via create_app, which calls backend = config.backend.get_backend( heartbeat_stale_threshold_s=config.heartbeat_stale_threshold_s, ), would then propagate that change to the transport
however, if they decide to not set it, then the default value here def __init__( self, message_size_limit_kb: int = 1024, heartbeat_stale_threshold_s: float = DEFAULT_THRESHOLD_S, ) will take over
| async def _heartbeat_loop(self) -> None: | ||
| heartbeat_interval: int = 60 | ||
| # Only runs for local exchange | ||
| if not hasattr(self._transport, 'heartbeat_interval_s'): |
There was a problem hiding this comment.
It looks like the local exchange has this property. so what is this trying to catch?
There was a problem hiding this comment.
I meant that this skips anything that's not non-cloud exchange, so heartbeat_interval = self._transport.heartbeat_interval_s only runs for non-cloud exchanges
| return MailboxStatus.TERMINATED | ||
| else: | ||
| return MailboxStatus.ACTIVE | ||
| last_heartbeat = await self.heartbeat_status(uid) |
There was a problem hiding this comment.
Hmm, I'm trying to think about how we minimize the repeated code. It seems every transport follows the same pattern, and we create the heartbeat_interval_s inside every transport. What if instead hearbeat_stale_periods became an attribute of the client. And inside the clientstatus which used to just route messages to the transport, we did both a status check, then a conditional heartbeat check? That way we create this logic just once?
There was a problem hiding this comment.
hmm, I thought about this for a bit, and while it is true that we should be minimizing code, i don't think using the client's status directly to resolve status checks for INACTIVE makes the most sense?
The issue that I thought about is that people in the future (or perhaps right now) might write something that hits the transport's status directly and do something with the return value (even though that isn't an issue for now, given nothing depends on INACTIVE yet). thus, if we advertise each transport's status as returning the whole range of MailboxStatus, which is what it seems like, then it could be misleading given that status actually doesn't return one of the four values?
Maybe we could write something like a helper function in transport.py and then use that in each of the transports to minimize the code for the resolution logic, but still keep the idea that each transport's status returns what its supposed to?
There was a problem hiding this comment.
What if we moved the entire "status" abstraction out of the transport? Then transport would only be resposible for implementing heartbeat_status, and the client turns that time/error into "MISSING", "ACTIVE", "INACTIVE" or "TERMINATED"? This would avoid a redundant call to the transport and minimize the implementation of each transport
AK2000
left a comment
There was a problem hiding this comment.
This looks good! I think the main thing is moving some fields into exchange/client.py instead of each transport, but I think the logic is there and I do like the way this change has improved checking status
| status = await self._redis_client.get(self._status_key(uid)) | ||
| if status is None: | ||
| raise BadEntityIdError(uid) | ||
| elif status.decode() == _MailboxState.INACTIVE.value: |
There was a problem hiding this comment.
Wait, I don't think this is right. Even if a mailbox is inactive, we should be able to terminate that mailbox (i.e. stop new messages from being sent to it)
There was a problem hiding this comment.
What happened was that for _MailboxState, the enum was named confusingly from the start, where for example, terminate() changes the transport's private status to INACTIVE. Thus, while it appears semantically wrong, the logic is actually right given that we raise TerminatedError on this INACTIVE that's actually suppose to mean mailbox terminated.
I'm going to change _MailboxState to just ACTIVE and TERMINATED, since it's purpose as a private status is just to distinguish between those two. It should now make sense semantically
| redis_host: str, | ||
| redis_port: int, | ||
| *, | ||
| heartbeat_stale_periods: int = 4, |
There was a problem hiding this comment.
Now that "status" is implemented in the client, I think the heartbeat_stale_periods should be a attribute of the client too
| redis_port: int, | ||
| *, | ||
| heartbeat_stale_periods: int = 4, | ||
| heartbeat_interval_s: float = 60, |
There was a problem hiding this comment.
And since the heartbeat loop is a method in client, I think this attribute should be in client as well
d969900 to
7e3e9b5
Compare
…alls, made update_heartbeat in client no longer a noop
Summary
Created a new category of status for mailbox status - Inactive - so that we can distinguish between mailboxes that are actively listening and alive against mailboxes that haven't been terminated but are idling
A mailbox is inactive when it's last active heartbeat was over 2 minutes ago
Related Issues
Relates to Issue #366
Changes
Testing
transport_test.py, backend_test.py
Pull Request Checklist
Please confirm the PR meets the following requirements.
pre-commit(e.g., ruff, mypy, etc.).