A small demo that stitches together a simulated bus, MQTT broker, Postgres-backed consumer, and a FastAPI WebSocket feed so you can visualize live vehicle positions in the browser. The repo demonstrates how to move data from MQTT to a relational database and expose it via REST/WebSocket for a Mapbox GL frontend.
- Bus simulator (
sim_route_buses.py): publishes location points along real routes inroutes/tobuses/<id>/locationevery second. - MQTT broker: Mosquitto (Docker or local package) receives simulator messages and broadcasts them to subscribers.
- Consumer (
consumer.py): listens on MQTT topics, writes snapshots tobus_locations_latest, and appends every point tobus_locations_history. - API server (
api_server.py): FastAPI exposes REST snapshots and history plus a/ws/busesWebSocket for live updates. - Frontend (
index.html): deck.gl + Mapbox GL that fetches the Mapbox token from/api/configand connects to the WebSocket stream for visualization.
- Python 3.10+ (asyncio support).
- PostgreSQL instance accessible locally.
- Mosquitto broker (Docker container or system install).
- Mapbox access token for the front-end (used via
/api/config). - Optional: Docker CLI for the Mosquitto container.
- Copy
.env.exampleto.env. - Fill in the values (
BROKER_HOST,BROKER_PORT,PG_DSNor the PG_* pieces,MAPBOX_TOKEN). - The Python scripts use
python-dotenvto load the values; missing keys raise descriptive errors so you know which variable to add.
- Create the database (run as the
postgresuser or a privileged account):createdb -U postgres realtime_demo
- Run the following SQL (for example with
psql -d realtime_demo -f setup.sql):-- latest snapshot per bus CREATE TABLE IF NOT EXISTS bus_locations_latest ( bus_id TEXT PRIMARY KEY, lat DOUBLE PRECISION NOT NULL, lon DOUBLE PRECISION NOT NULL, speed_kmh REAL, ts_utc TIMESTAMPTZ, updated_at TIMESTAMPTZ DEFAULT now() ); -- history of locations CREATE TABLE IF NOT EXISTS bus_locations_history ( id SERIAL PRIMARY KEY, bus_id TEXT NOT NULL, lat DOUBLE PRECISION NOT NULL, lon DOUBLE PRECISION NOT NULL, speed_kmh REAL, ts_utc TIMESTAMPTZ, ingested_at TIMESTAMPTZ DEFAULT now() );
bus_locations_latestkeeps the current position per vehicle;bus_locations_historylogs every ingested point.
Option 1: Docker (recommended)
docker run -it --name mosquitto \
-p 1883:1883 -p 9001:9001 \
eclipse-mosquittoOption 2: macOS package
brew install mosquittoUseful tools:
mosquitto_pub: publish custom JSON payloads for manual testing.mosquitto_sub: subscribe to topics (e.g.,mosquitto_sub -h localhost -p 1883 -t "buses/#").
The minimal dependencies listed in requirements.txt:
paho-mqtt
psycopg2-binary
fastapi
uvicorn[standard]
python-dotenv
Install them with pip:
python -m pip install -r requirements.txtpsycopg2-binary is easier for local development; swap to the system psycopg2 wheel for production deployments if desired.
- Create
.envfrom.env.exampleand verify the credentials (Postgres DSN, Mapbox token, MQTT broker host/port). - Prepare the database by starting Postgres and running the SQL above to create the two tables.
- Install Python dependencies:
python -m pip install -r requirements.txt. - Start Mosquitto (Docker or brew) so MQTT topics are available before the consumer connects.
- Start the API server:
uvicorn api_server:app --reload --port 8000. This exposes REST, WebSocket, and config endpoints. - Start the consumer:
python consumer.py. It will connect to the MQTT broker and keep Postgres updated. - Start the simulator:
python sim_route_buses.py. It begins publishing fake location data to MQTT. - Optionally monitor MQTT: run
mosquitto_sub -h localhost -p 1883 -t "buses/#"in another terminal to watch raw messages. - Open the frontend:
index.htmlcan be served viapython -m http.server 8001or opened directly. It fetches the Mapbox token from/api/configand subscribes to/ws/buses.
- REST snapshot:
http://localhost:8000/api/buses - History:
http://localhost:8000/api/history?bus_id=bus_001&minutes=10 - WebSocket feed:
ws://localhost:8000/ws/buses - Mapbox token:
http://localhost:8000/api/config
Every component relies on the .env configuration and the MQTT broker running before they attempt to connect. Follow the numbered sequence above for the smoothest startup experience after cloning the repo.# Realtime Bus Demo — README
docker run -it --name mosquitto \
-p 1883:1883 -p 9001:9001 \
eclipse-mosquitto
- The Docker container runs the broker (like a radio station)
brew install mosquitto
- mosquitto_pub (publisher): can be used for manual testing
- mosquitto_sub (subscriber): listens for messages on topics
Create database ans two tables:bus_locations_latest and
bus_locations_history。
- Create database(using
postgresuser):
createdb -U postgres realtime_demo
- Connect to
realtime_demodatabase and create tables:
-- latest snapshot per bus
CREATE TABLE IF NOT EXISTS bus_locations_latest (
bus_id TEXT PRIMARY KEY,
lat DOUBLE PRECISION NOT NULL,
lon DOUBLE PRECISION NOT NULL,
speed_kmh REAL,
````markdown
# Realtime Bus Demo — README
Initial version:
- Vehicle simulator: a Python script simulates vehicle devices that publish location data to an MQTT broker.
- Location points are generated along real bus routes; one point per second. Routes loop back automatically. See the `routes/` directory for three sample routes.
- MQTT Broker: Mosquitto running locally (or in Docker).
- Database: PostgreSQL. For this demo, a simple OLTP setup is sufficient to store both current positions and historical points.
- API: FastAPI (ASGI-native) is used to serve current positions and historical trajectories from the database.
- You could use Flask, but since it is WSGI-based you would need Flask-SocketIO to provide WebSocket support.
- WebSocket: FastAPI (used in this example) provides first-class WebSocket support.
## Local MQTT Broker (Mosquitto)
Run Mosquitto in Docker (exposes MQTT on 1883 and WebSocket on 9001):
docker run -it --name mosquitto
-p 1883:1883 -p 9001:9001
eclipse-mosquitto
- mosquitto_pub (publisher): can be used for manual testing
- mosquitto_sub (subscriber): listens for messages on topics
Subscribe to all 'buses' topics in one terminal:
mosquitto_sub -h localhost -p 1883 -t "buses/#"
Run the simulator in another terminal to send messages:
python sim_route_buses.py
---
## PostgreSQL — Database setup
The SQL and commands below help create the database and two tables: `bus_locations_latest` and
`bus_locations_history`.
1) Create the database (run as the `postgres` user or a privileged account):
createdb -U postgres realtime_demo
2) Connect to the `realtime_demo` database and create the tables:
-- latest snapshot per bus CREATE TABLE IF NOT EXISTS bus_locations_latest ( bus_id TEXT PRIMARY KEY, lat DOUBLE PRECISION NOT NULL, lon DOUBLE PRECISION NOT NULL, speed_kmh REAL, ts_utc TIMESTAMPTZ, updated_at TIMESTAMPTZ DEFAULT now() );
-- history of locations CREATE TABLE IF NOT EXISTS bus_locations_history ( id SERIAL PRIMARY KEY, bus_id TEXT NOT NULL, lat DOUBLE PRECISION NOT NULL, lon DOUBLE PRECISION NOT NULL, speed_kmh REAL, ts_utc TIMESTAMPTZ, ingested_at TIMESTAMPTZ DEFAULT now() );
Notes:
- `bus_locations_latest` stores the most recent position per vehicle (primary key: `bus_id`).
- `bus_locations_history` stores each received point so you can query or replay trajectories.
---
## Python — Package list
Recommended minimal dependencies (put them in `requirements.txt`):
paho-mqtt psycopg2-binary fastapi uvicorn[standard] python-dotenv
Install:
python -m pip install -r requirements.txt
Note: `psycopg2-binary` is convenient for local development; for production you may prefer a system-installed `psycopg2`.
---
## Mapbox token
The front-end `index.html` contains a Mapbox token placeholder. You can:
1) Replace the `mapboxglAccessToken` string directly in `index.html` for a quick test, or
2) Inject the token at runtime from your development server (requires additional setup).
Quick method: open `index.html` and replace the following line with your token:
```js
const mapboxglAccessToken = 'your_mapbox_token_here';
Get a token from https://www.mapbox.com and keep it secret in production.
- Start Postgres and create the database and tables (see above).
- Start Mosquitto (Docker or brew).
Subscribe all buses info in one topic:
mosquitto_sub -h localhost -p 1883 -t "buses/#"
- Install Python packages:
python -m pip install -r requirements.txt
- Start the API server (FastAPI):
uvicorn api_server:app --reload --port 8000
- Start the consumer:
python consumer.py
- Start the simulator:
python sim_route_buses.py (Push the simulated bus message)
- Open
index.htmldirectly or serve it from a static server, and make sure the Mapbox token is configured.
API / WebSocket:
- REST:
http://localhost:8000/api/buses(latest snapshot) - History:
http://localhost:8000/api/history?bus_id=bus_001&minutes=10 - WebSocket:
ws://localhost:8000/ws/buses
Subscribes to buses/+/location → receives JSON → writes to PostgreSQL tables bus_locations_latest and
bus_locations_history (upsert/update as appropriate).