Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions rabbit_mq/.ipynb_checkpoints/Untitled-checkpoint.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"cells": [],
"metadata": {},
"nbformat": 4,
"nbformat_minor": 5
}
6 changes: 6 additions & 0 deletions rabbit_mq/Untitled.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"cells": [],
"metadata": {},
"nbformat": 4,
"nbformat_minor": 5
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright 2022 Bloomberg Finance L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

class consumerInterface():
def __init__(self, routing_key : str, **kwargs) -> None:
pass

def startConsuming(self):
pass

def stopConsuming(self):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2022 Bloomberg Finance L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

class producerInterface():
def __init__(self, routing_key : str, pub_delay : int, message_producer : Any) -> None:
pass

def startPublishing(self):
pass
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
91 changes: 91 additions & 0 deletions rabbit_mq/setup/.ipynb_checkpoints/rmq_intro-checkpoint.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```python\n",
"# Copyright 2022 Bloomberg Finance L.P.\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# http://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License.\n",
"```\n",
"\n",
"# Rabbit MQ Setup Learning Item\n",
"\n",
"### What is Rabbit MQ?\n",
"\n",
"RabbitMQ is a lightweight, language agnostic & open source messaging software that implements the [AMQP (Advanced Message Queuing Protocol)](https://www.amqp.org/about/what). It works as a message broker that supports various types of asynchronous messaging features and routing types. Using RabbitMQ you can configure your message to get from a source to destinations in a variety of ways.\n",
"\n",
"Within this lab we'll focus on setting up a basic producer-consumer framework within rabbitMQ. This is just the beginning and there are many more ways rabbitMQ can be used! For more learning opportunities check out the official rabbitMQ [Getting Started](https://www.rabbitmq.com/getstarted.html) page\n",
"\n",
"### Setting up RabbitMQ within Docker\n",
"\n",
"For this lab we'll be using Docker to manage the rabbitMQ management image along with a Jupyter notebook image that we'll use to create applications that can produce & consume data using rabbitMQ. To begin let's make sure we're able to properly setup the rabbitMQ image from DockerHub & that we can run the management dashboard.\n",
"\n",
"#### Pulling the RabbitMQ image\n",
"\n",
"Through the lab we'll be using the docker image `rabbitmq:3-management`. This image includes all the basic rabbitMQ features with the rabbitMQ management UI. Pull down the image locally using `docker pull rabbitmq:3-management`\n",
"\n",
"#### Running the RabbitMQ Image\n",
"\n",
"To run the rabbitMQ image we can use the command `docker run --rm -p 15672:15672 -p 5672:5672 rabbitmq:3-management`. \n",
"After a small period you should see ouput to your terminal similar to:\n",
"```\n",
"2022-10-30 19:13:09.745287+00:00 [info] <0.728.0> Server startup complete; 4 plugins started.\n",
"2022-10-30 19:13:09.745287+00:00 [info] <0.728.0> * rabbitmq_prometheus\n",
"2022-10-30 19:13:09.745287+00:00 [info] <0.728.0> * rabbitmq_management\n",
"2022-10-30 19:13:09.745287+00:00 [info] <0.728.0> * rabbitmq_web_dispatch\n",
"2022-10-30 19:13:09.745287+00:00 [info] <0.728.0> * rabbitmq_management_agent\n",
"```\n",
"\n",
"This will start a docker container with a few options. The `--rm` flag tells docker to remove the continer after it exits. The two port flags (`-p`) map & expose ports 15672 & 5672. Port 5672 is used by the AMQ Protocol for messaging while port 15672 is used by the RabbitMQ management UI. For now our main concern port 15672. \n",
"\n",
"At this point & time you should be able to access the rabbitMQ management UI via `localhost:15672` in a web browser. You may be prompted with a login screen, which you can use the default username & password of *guest*.\n",
"\n",
"![alt text](./rabbit_mq_management_ui.PNG)\n",
"\n",
"\n",
"#### Interfacing with RabbitMQ\n",
"\n",
"For the scope of this lab we'll be using the [pika](https://pika.readthedocs.io/en/stable/) package. This is an easy to use python implementation of the AMQP protocol."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.9.5 64-bit",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.5"
},
"orig_nbformat": 4,
"vscode": {
"interpreter": {
"hash": "6faf60acf2378adaaab85c2073b6c5e11a669e6a3edbe78352cafb6588944519"
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}
7 changes: 3 additions & 4 deletions rabbit_mq/setup/rmq_intro.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.9.5 64-bit",
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
Expand All @@ -77,15 +77,14 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.5"
"version": "3.11.6"
},
"orig_nbformat": 4,
"vscode": {
"interpreter": {
"hash": "6faf60acf2378adaaab85c2073b6c5e11a669e6a3edbe78352cafb6588944519"
}
}
},
"nbformat": 4,
"nbformat_minor": 2
"nbformat_minor": 4
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import pika
from consumer_interface import mqConsumerInterface

class mqConsumer(mqConsumerInterface):
def __init__(self, binding_key: str, exchange_name: str, queue_name: str) -> None:
self.binding_key = binding_key
self.exchange_name = exchange_name
self.queue_name = queue_name
self.setupRMQConnection()

def setupRMQConnection(self) -> None:
# Connect to RabbitMQ running in the other container
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host="rabbitmq")
)
self.channel = self.connection.channel()

# Create the queue if it doesn't exist
self.channel.queue_declare(queue=self.queue_name)

# Create the exchange if it doesn't exist
self.channel.exchange_declare(exchange=self.exchange_name, exchange_type="direct")

# Bind the queue to the exchange with the binding key
self.channel.queue_bind(
queue=self.queue_name,
exchange=self.exchange_name,
routing_key=self.binding_key
)

# Tell RabbitMQ to call on_message_callback when a message arrives
self.channel.basic_consume(
queue=self.queue_name,
on_message_callback=self.on_message_callback,
auto_ack=False
)

def on_message_callback(self, channel, method_frame, header_frame, body) -> None:
# Acknowledge the message (tell RabbitMQ we got it)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
# Print the message decoded from bytes to string
print(body.decode("utf-8"))

def startConsuming(self) -> None:
print(" [*] Waiting for messages. To exit press CTRL+C")
self.channel.start_consuming()

def __del__(self) -> None:
print("Closing RMQ connection on destruction")
try:
self.channel.close()
self.connection.close()
except Exception:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import pika
from mqProducerInterface import mqProducerInterface

class mqProducer(mqProducerInterface):
def __init__(self, host, queue_name):
"""
Constructor: Save the two variables needed to instantiate the class
and initialize the RMQ connection.
"""
self.host = host
self.queue_name = queue_name

# Initialize connection and channel placeholders
self.connection = None
self.channel = None

# Call the setup function immediately
self.setupRMQConnection()

def setupRMQConnection(self):
"""
Establish connection to the RabbitMQ service.
"""
# Create a connection to the broker
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host)
)
self.channel = self.connection.channel()

# Ensure the queue exists
self.channel.queue_declare(queue=self.queue_name)

def publishOrder(self, message):
"""
Publish a simple UTF-8 string message from the parameter.
Close Channel and Connection afterwards.
"""
# Ensure the message is sent as a UTF-8 string
self.channel.basic_publish(
exchange='',
routing_key=self.queue_name,
body=message.encode('utf-8')
)

print(f" [x] Sent: {message}")

# Close Channel and Connection
self.channel.close()
self.connection.close()
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import pika
from producer_interface import mqProducerInterface

class mqProducer(mqProducerInterface):
def __init__(self, routing_key, exchange_name):
self.routing_key = routing_key
self.exchange_name = exchange_name
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=self.exchange_name, exchange_type='direct')

def publishOrder(self, message):
self.channel.basic_publish(
exchange=self.exchange_name,
routing_key=self.routing_key,
body=message.encode('utf-8')
)
print(f"Sent: {message}")
self.connection.close()