Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 17 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,9 @@ pip install calendar-queue

## Usage

### CalendarQueue
## Quickstart

`CalendarQueue` is a low-level, efficient queue for scheduling events at specific times:

```python
import asyncio
from datetime import datetime, timedelta
from calendar_queue import CalendarQueue

cq = CalendarQueue()

async def schedule_events():
for i in range(3):
scheduled_time = (datetime.now() + timedelta(seconds=i+1)).timestamp()
cq.put_nowait((scheduled_time, f"Event {i+1}"))

async def process_events():
for _ in range(3):
ts, event = await cq.get()
print(f"{datetime.fromtimestamp(ts).isoformat()}: {event}")

async def main():
await asyncio.gather(schedule_events(), process_events())

if __name__ == "__main__":
asyncio.run(main())
```

### Calendar

`Calendar` is a higher-level abstraction that simplifies working with `datetime` objects and provides an async iterator:
A minimal, practical example showing how `Calendar` can schedule and emit events: schedule three reminders a few seconds apart and consume them as they fire.

```python
import asyncio
Expand All @@ -64,24 +36,32 @@ from calendar_queue import Calendar

calendar = Calendar()

async def schedule_events():
for i in range(3):
scheduled_time = datetime.now() + timedelta(seconds=i+1)
calendar.schedule(f"Event {i+1}", when=scheduled_time)
async def producer():
for i in range(1, 4):
when = datetime.now() + timedelta(seconds=i * 2)
calendar.schedule(f"Reminder {i}", when=when)
print(f"scheduled Reminder {i} for {when.isoformat()}")
await asyncio.sleep(0.1)

async def process_events():
async def consumer():
async for ts, event in calendar:
print(f"{datetime.fromtimestamp(ts).isoformat()}: {event}")
if int(ts) == int((datetime.now() + timedelta(seconds=3)).timestamp()):
# stop after the last event
if event == "Reminder 3":
calendar.stop()

async def main():
await asyncio.gather(schedule_events(), process_events())
await asyncio.gather(producer(), consumer())

if __name__ == "__main__":
asyncio.run(main())
```

For more examples and the API reference, see the documentation in the `docs` folder:

- Tutorials: [docs/tutorials](docs/tutorials/)
- API reference: [docs/api-reference](docs/api-reference/)

## Development

This library is developed using Python 3.11 and [`pdm`](https://pdm-project.org/en/latest/) as dependency manager.
Expand Down
240 changes: 45 additions & 195 deletions docs/tutorials/calendar-queue.md
Original file line number Diff line number Diff line change
@@ -1,221 +1,71 @@
# Calendar Queue Tutorial

`CalendarQueue` is a [PriorityQueue](https://docs.python.org/3/library/asyncio-queue.html#priority-queue) in which the priority of each queued element (i.e. event) is the unix timestamp of the event. While using the `Calendar` class is much easier, here are some possible usages of `CalendarQueue`.
`CalendarQueue` is an asyncio-friendly priority queue where each item is a
tuple whose priority is the unix timestamp when the item should be emitted.
Below are concise, practical examples showing correct and idiomatic usage.

## The Event
## Basic rules

In a priority queue each element inserted must be comparable with the other so that they can be ordered. In python this translates as each item must implement the `__lt__` method for comparing two elements. To create items with several components, the simplest thing is to create a tuple, however be mindful so that at least one of the tuple elements is guaranteed to be unique, otherwise they cannot be ordered and therefore the priority will break.
- Each queued item should be comparable or wrapped in a tuple `(timestamp, item)`.
- Use `put_nowait((ts, item))` to schedule; use `await get()` to receive the next
scheduled item when its time has come.

If you need to use complex elements you can create your own class that implements `__lt__` as well or follow [python's documentation suggestion and use a dataclass that ignores the data item and only compares the priority number](https://docs.python.org/3/library/queue.html#queue.PriorityQueue).
## Producer / consumer example

This small example demonstrates a producer that schedules short jobs and a
consumer that processes them when their scheduled time arrives.

The `CalendarQueue` class supports type hints so you can define your own type and then use as:
```python
import asyncio
from datetime import datetime
from random import randrange

# simplest case using tuple
MyCustomEventType = tuple[str, str] # example: ("foo", "bar")
from calendar_queue import CalendarQueue

# complex event
class MyComplexEventType:
cq = CalendarQueue()

def __init__(self, ...):
...
async def producer(n=5):
for i in range(1, n + 1):
# schedule each job 1..5 seconds from now
ts = datetime.now().timestamp() + randrange(1, 6)
cq.put_nowait((ts, f"job-{i}"))
print(f"scheduled job-{i} for {datetime.fromtimestamp(ts).isoformat()}")
await asyncio.sleep(0.1)

def __lt__(self, other) -> bool:
async def consumer(total=5):
received = 0
while received < total:
ts, job = await cq.get()
print(f"{datetime.now().isoformat()}: running {job} scheduled for {datetime.fromtimestamp(ts).isoformat()}")
received += 1

# do your comparison
return True # True/False
async def main():
await asyncio.gather(producer(), consumer())

# use it
cq: CalendarQueue[MyComplexEventType] = CalendarQueue()
if __name__ == "__main__":
asyncio.run(main())
```

## Check the next events
## Inspecting and cancelling

Conveniently the time remaining to the next event can be checked using `next_in`. It the number of seconds remaining until the next events, otherwise returns `None` if no events are scheduled:

```python

time_remaining = cq.next_in()

if time_remaining is not None:
print(f"{time_remaining} seconds remaining until the next scheduled event")
else:
print("No scheduled events")

```

You can also peek the next event by using `peek`:
You can peek the next item with `peek()` and see the seconds until it fires
with `next_in()`:

```python
next_event = cq.peek()

if next_event:
print(f"Next event is {next_event}")
else:
print("No upcoming events")
time_left = cq.next_in()
```

## Deleting events

In case cancelling an event is needed, the `delete_items` method can be used. It needs as argument a callable function that receives the tuple `(timestamp, event)` and returns `True` if the element should be deleted, `False` otherwise.
Suppose we would like to cancel all events that are scheduled to happen in the next 5 minutes:
To remove scheduled items, use `delete_items(selector)` passing a selector
callable that returns `True` for items you want removed. For example, remove all
items scheduled within the next 5 minutes:

```python
import time

now = time.time()

deadline = now + 300 # now + 5 mins

def event_selector(item: tuple[int, Any]):

ts, event = item

if ts <= deadline:
return True

return False

cancelled_events = cq.cancel_events(event_selector)

for ts, ev in cancelled_events:
# do some checks on the cancelled events
...

deadline = time.time() + 300
deleted = cq.delete_items(lambda item: item[0] <= deadline)
```

## Example usage

A full example usage of `CalendarQueue` that involves two (or more) asyncio Tasks following the producer-consumer pattern.

Suppose we handle a take away restaurant, we gather the orders and the kitchen needs to cook the meals for the requested time for the customers to pick them up. To do so, we have:

1. producer tasks that would be the persons responsible for taking the orders
2. consumer tasks that would be the cooks

The consumer tasks will be triggered only at the scheduled time (we want to cook the meals when they need to be picked up, not when we receive the order!).

```python
import asyncio
from datetime import datetime, timedelta
from random import choice, choices, randint, randrange
from secrets import token_hex

from calendar_queue import CalendarQueue

# Let's define what's a meal order
Order = tuple[int, str, list] # (order id, customer name, meals)

# use the low level calendar queue
cq: CalendarQueue[Order] = CalendarQueue()

first_names = ["Clara", "John", "Dave", "Julia"]
last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia"]

meals = ["pasta", "pizza", "ramen", "hamburger", "dahl", "pad thai"]

# order id counter
id_count = 0

def get_customer_name() -> str:
"""Create random customer name"""

return f"{choice(first_names)} {choice(last_names)}"

def get_ordered_meal() -> list[str]:
"""Create random meal order"""

n_meals = randint(1, 5)
return choices(meals, k=n_meals)

async def get_order():
"""Wait for a customer call and put the new order in the queue"""

while True:

print("Waiting for a call..")

await asyncio.sleep(randint(1, 5))

# hello sir/madame, what's your name?
customer_name = get_customer_name()

# what would you like to order?
ordered_meal = get_ordered_meal()

# at which time will you pick it up?
scheduled_ts = datetime.now() + timedelta(seconds=randrange(5, 10))

global id_count
id_count += 1

# create a unique order id
order_id = id_count

print(f"Got a call from {customer_name}, order id {order_id}. "
f"Ordered {ordered_meal} to be picked up at {scheduled_ts.isoformat()}")

# put together the order
customer_order: Order = (order_id, customer_name, meals)

# put the order in the queue, no need to wait, the queue has no size
cq.put_nowait((scheduled_ts.timestamp(), customer_order))


async def wait_for_order_to_be_prepared():
"""This is the kitchen, wait for the right moment to start preparing the meals"""

while True:
try:
ts, el = await cq.get()
except KeyboardInterrupt:
break

print(f"{datetime.now().isoformat()}: preparing {el} "\
f"scheduled for {datetime.fromtimestamp(ts).isoformat()}")

async def main():

await asyncio.gather(import asyncio
from datetime import datetime
from random import randrange
from secrets import token_hex

from calendar_queue import CalendarQueue

# let's define what's a meal order
Order = tuple[str, str, list] # (order id, customer name, meals)

# create a CalendarQueue instance
cq: CalendarQueue[Order] = CalendarQueue()


async def put_random():

print("Waiting 3 seconds before starting to put")

await asyncio.sleep(3)

print("Wait completed, done")

while True:

await asyncio.sleep(1)

scheduled_ts = datetime.now().timestamp() + randrange(1, 5)

s = token_hex(8)

current_item: CustomItem = (s)

print(f"{datetime.now().isoformat()}: putting {current_item} scheduled for {datetime.fromtimestamp(scheduled_ts).isoformat()}")

cq.put_nowait((scheduled_ts, current_item))
asyncio.create_task(get_order()),
asyncio.create_task(wait_for_order_to_be_prepared()),
)


if __name__ == "__main__":
asyncio.run(main())
```

This file focuses on `CalendarQueue` primitives. For easier datetime-based
usage prefer the `Calendar` helper (see the `Calendar` tutorial).
Loading
Loading