This repository contains an end-to-end example of fetching real-time crypto trades (from Binance via CCXT), publishing them to Kafka, consuming them from Kafka, and computing 1-minute OHLC bars. The bars are then stored in InfluxDB for visualization through Grafana.
graph LR
A["CCXT Script(Python)"] --> B["Kafka Producer(Python)"]
B --> C["Kafka Broker(s)(Cluster)"]
C --> D["Kafka Consumer(Sink to DB)"]
D --> E["Data Store(InfluxDB)"]
E --> F["Visualization(Grafana)"]
C --> Z["Zookeeper(Cluster)"]
-
Docker Compose (
docker-compose.yml)
Defines containers for Zookeeper, Kafka, InfluxDB, and Grafana. -
Kafka Producer (
kafka_producer.py)
Fetches trades using CCXT and sends them to a Kafka topic (crypto-ticksby default). -
Kafka Consumer with InfluxDB (
kafka_consumer_with_influxdb.py)
Consumes trades from Kafka, computes OHLC bars in-memory, and periodically writes them to InfluxDB.
- Docker and Docker Compose
- Python 3.7+ (if you plan to run the producer/consumer locally)
- Internet connection to fetch trades from Binance (via CCXT)
Defines the following services:
-
Zookeeper
- Image:
bitnami/zookeeper:latest - Container name:
zookeeper - Port:
2181(mapped to the host)
- Image:
-
Kafka
- Image:
bitnami/kafka:latest - Container name:
kafka - Depends on Zookeeper
- Ports:
9092(mapped to the host) - Environment variables:
KAFKA_BROKER_ID=1KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181ALLOW_PLAINTEXT_LISTENER=yes
- Image:
-
InfluxDB (1.8)
- Image:
influxdb:1.8 - Container name:
influxdb - Port:
8086(mapped to the host) - Volume:
influx_data(for data persistence) - Environment variables:
INFLUXDB_DB=crypto_dataINFLUXDB_USER=adminINFLUXDB_ADMIN_USER=adminINFLUXDB_ADMIN_PASSWORD=admin123INFLUXDB_PASSWORD=admin123
- Image:
-
Grafana
- Image:
grafana/grafana:latest - Container name:
grafana - Port:
3000(mapped to the host) - Depends on InfluxDB
- Volume:
grafana_data(for data persistence) - Environment variable:
GF_SECURITY_ADMIN_PASSWORD=admin123
- Image:
Volumes:
influx_datagrafana_data
- Purpose: Continuously fetches recent trades from Binance (using CCXT) and publishes them to the
crypto-ticksKafka topic. - Fetch interval: 10 seconds (configurable by
FETCH_INTERVAL). - Kafka servers: Defaults to
localhost:9092in the code.
Usage (locally):
pip install ccxt kafka-python
python kafka_producer.pygit clone https://github.com/yourusername/yourrepo.git
cd yourrepoCopy code
docker-compose up -dThis will start the following containers:
- zookeeper
- kafka
- influxdb
- grafana Verify their status:
docker-compose ps- Should be running on
localhost:9092. - Test connectivity using a Kafka client or the kafkacat utility.
- Available on
localhost:8086. - You can verify by running:
docker exec -it influxdb influx -username admin -password admin123Then:
SHOW DATABASES;You should see crypto_data.
- Accessible at http://localhost:3000.
- Default credentials:
admin / admin123. - After login, add InfluxDB as a data source.
pip install ccxt kafka-python
python kafka_producer.py- Fetches trades for
BTC/USDT. - Sends them to
crypto-ticks.
pip install kafka-python influxdb
python kafka_consumer_with_influxdb.py- Consumes trades from
crypto-ticks. - Aggregates into 1-minute bars and writes them to
crypto_datain InfluxDB.
- Go to http://localhost:3000.
- Log in with the default credentials (
admin / admin123). - Add a new data source:
- Type: InfluxDB
- URL:
http://influxdb:8086(if referencing via the Docker network) orhttp://localhost:8086if connecting from your host - Database:
crypto_data - User:
admin - Password:
admin123
- Create a new dashboard and panel.
- Query the measurement
ohlc_1mto chart the data.
-
Kafka Connection Errors Ensure
KAFKA_ADVERTISED_LISTENERSis set toPLAINTEXT://localhost:9092if the producer/consumer scripts run on your host. -
InfluxDB Write Errors Check consumer logs for details. Verify InfluxDB credentials and database name.
-
Grafana Access If port
3000is already in use, either change the host port indocker-compose.ymlfile or free up that port.
Contributions are welcome! Feel free to open issues or pull requests for bug fixes, new features, or improvements.
MIT License