diff --git a/rabbit_mq/.ipynb_checkpoints/Untitled-checkpoint.ipynb b/rabbit_mq/.ipynb_checkpoints/Untitled-checkpoint.ipynb new file mode 100644 index 00000000..363fcab7 --- /dev/null +++ b/rabbit_mq/.ipynb_checkpoints/Untitled-checkpoint.ipynb @@ -0,0 +1,6 @@ +{ + "cells": [], + "metadata": {}, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/rabbit_mq/Untitled.ipynb b/rabbit_mq/Untitled.ipynb new file mode 100644 index 00000000..363fcab7 --- /dev/null +++ b/rabbit_mq/Untitled.ipynb @@ -0,0 +1,6 @@ +{ + "cells": [], + "metadata": {}, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/rabbit_mq/interfaces/.ipynb_checkpoints/consumerInterface-checkpoint.py b/rabbit_mq/interfaces/.ipynb_checkpoints/consumerInterface-checkpoint.py new file mode 100644 index 00000000..4c042ab3 --- /dev/null +++ b/rabbit_mq/interfaces/.ipynb_checkpoints/consumerInterface-checkpoint.py @@ -0,0 +1,23 @@ +# Copyright 2022 Bloomberg Finance L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +class consumerInterface(): + def __init__(self, routing_key : str, **kwargs) -> None: + pass + + def startConsuming(self): + pass + + def stopConsuming(self): + pass diff --git a/rabbit_mq/interfaces/.ipynb_checkpoints/producerInterface-checkpoint.py b/rabbit_mq/interfaces/.ipynb_checkpoints/producerInterface-checkpoint.py new file mode 100644 index 00000000..3088a368 --- /dev/null +++ b/rabbit_mq/interfaces/.ipynb_checkpoints/producerInterface-checkpoint.py @@ -0,0 +1,20 @@ +# Copyright 2022 Bloomberg Finance L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +class producerInterface(): + def __init__(self, routing_key : str, pub_delay : int, message_producer : Any) -> None: + pass + + def startPublishing(self): + pass diff --git a/rabbit_mq/setup/.ipynb_checkpoints/rabbit_mq_management_ui-checkpoint.PNG b/rabbit_mq/setup/.ipynb_checkpoints/rabbit_mq_management_ui-checkpoint.PNG new file mode 100644 index 00000000..4ca62328 Binary files /dev/null and b/rabbit_mq/setup/.ipynb_checkpoints/rabbit_mq_management_ui-checkpoint.PNG differ diff --git a/rabbit_mq/setup/.ipynb_checkpoints/rmq_intro-checkpoint.ipynb b/rabbit_mq/setup/.ipynb_checkpoints/rmq_intro-checkpoint.ipynb new file mode 100644 index 00000000..8cccbdc9 --- /dev/null +++ b/rabbit_mq/setup/.ipynb_checkpoints/rmq_intro-checkpoint.ipynb @@ -0,0 +1,91 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "```python\n", + "# Copyright 2022 Bloomberg Finance L.P.\n", + "#\n", + "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", + "# you may not use this file except in compliance with the License.\n", + "# You may obtain a copy of the License at\n", + "#\n", + "# http://www.apache.org/licenses/LICENSE-2.0\n", + "#\n", + "# Unless required by applicable law or agreed to in writing, software\n", + "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", + "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", + "# See the License for the specific language governing permissions and\n", + "# limitations under the License.\n", + "```\n", + "\n", + "# Rabbit MQ Setup Learning Item\n", + "\n", + "### What is Rabbit MQ?\n", + "\n", + "RabbitMQ is a lightweight, language agnostic & open source messaging software that implements the [AMQP (Advanced Message Queuing Protocol)](https://www.amqp.org/about/what). It works as a message broker that supports various types of asynchronous messaging features and routing types. Using RabbitMQ you can configure your message to get from a source to destinations in a variety of ways.\n", + "\n", + "Within this lab we'll focus on setting up a basic producer-consumer framework within rabbitMQ. This is just the beginning and there are many more ways rabbitMQ can be used! For more learning opportunities check out the official rabbitMQ [Getting Started](https://www.rabbitmq.com/getstarted.html) page\n", + "\n", + "### Setting up RabbitMQ within Docker\n", + "\n", + "For this lab we'll be using Docker to manage the rabbitMQ management image along with a Jupyter notebook image that we'll use to create applications that can produce & consume data using rabbitMQ. To begin let's make sure we're able to properly setup the rabbitMQ image from DockerHub & that we can run the management dashboard.\n", + "\n", + "#### Pulling the RabbitMQ image\n", + "\n", + "Through the lab we'll be using the docker image `rabbitmq:3-management`. This image includes all the basic rabbitMQ features with the rabbitMQ management UI. Pull down the image locally using `docker pull rabbitmq:3-management`\n", + "\n", + "#### Running the RabbitMQ Image\n", + "\n", + "To run the rabbitMQ image we can use the command `docker run --rm -p 15672:15672 -p 5672:5672 rabbitmq:3-management`. \n", + "After a small period you should see ouput to your terminal similar to:\n", + "```\n", + "2022-10-30 19:13:09.745287+00:00 [info] <0.728.0> Server startup complete; 4 plugins started.\n", + "2022-10-30 19:13:09.745287+00:00 [info] <0.728.0> * rabbitmq_prometheus\n", + "2022-10-30 19:13:09.745287+00:00 [info] <0.728.0> * rabbitmq_management\n", + "2022-10-30 19:13:09.745287+00:00 [info] <0.728.0> * rabbitmq_web_dispatch\n", + "2022-10-30 19:13:09.745287+00:00 [info] <0.728.0> * rabbitmq_management_agent\n", + "```\n", + "\n", + "This will start a docker container with a few options. The `--rm` flag tells docker to remove the continer after it exits. The two port flags (`-p`) map & expose ports 15672 & 5672. Port 5672 is used by the AMQ Protocol for messaging while port 15672 is used by the RabbitMQ management UI. For now our main concern port 15672. \n", + "\n", + "At this point & time you should be able to access the rabbitMQ management UI via `localhost:15672` in a web browser. You may be prompted with a login screen, which you can use the default username & password of *guest*.\n", + "\n", + "![alt text](./rabbit_mq_management_ui.PNG)\n", + "\n", + "\n", + "#### Interfacing with RabbitMQ\n", + "\n", + "For the scope of this lab we'll be using the [pika](https://pika.readthedocs.io/en/stable/) package. This is an easy to use python implementation of the AMQP protocol." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3.9.5 64-bit", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.5" + }, + "orig_nbformat": 4, + "vscode": { + "interpreter": { + "hash": "6faf60acf2378adaaab85c2073b6c5e11a669e6a3edbe78352cafb6588944519" + } + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/rabbit_mq/setup/rmq_intro.ipynb b/rabbit_mq/setup/rmq_intro.ipynb index 8cccbdc9..a2046581 100644 --- a/rabbit_mq/setup/rmq_intro.ipynb +++ b/rabbit_mq/setup/rmq_intro.ipynb @@ -63,7 +63,7 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3.9.5 64-bit", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, @@ -77,9 +77,8 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.5" + "version": "3.11.6" }, - "orig_nbformat": 4, "vscode": { "interpreter": { "hash": "6faf60acf2378adaaab85c2073b6c5e11a669e6a3edbe78352cafb6588944519" @@ -87,5 +86,5 @@ } }, "nbformat": 4, - "nbformat_minor": 2 + "nbformat_minor": 4 } diff --git a/tech_lab_on_campus/market_watch/producer_and_consumer/consumer/solution/consumer_sol.py b/tech_lab_on_campus/market_watch/producer_and_consumer/consumer/solution/consumer_sol.py new file mode 100644 index 00000000..1f0fc55b --- /dev/null +++ b/tech_lab_on_campus/market_watch/producer_and_consumer/consumer/solution/consumer_sol.py @@ -0,0 +1,54 @@ +import pika +from consumer_interface import mqConsumerInterface + +class mqConsumer(mqConsumerInterface): + def __init__(self, binding_key: str, exchange_name: str, queue_name: str) -> None: + self.binding_key = binding_key + self.exchange_name = exchange_name + self.queue_name = queue_name + self.setupRMQConnection() + + def setupRMQConnection(self) -> None: + # Connect to RabbitMQ running in the other container + self.connection = pika.BlockingConnection( + pika.ConnectionParameters(host="rabbitmq") + ) + self.channel = self.connection.channel() + + # Create the queue if it doesn't exist + self.channel.queue_declare(queue=self.queue_name) + + # Create the exchange if it doesn't exist + self.channel.exchange_declare(exchange=self.exchange_name, exchange_type="direct") + + # Bind the queue to the exchange with the binding key + self.channel.queue_bind( + queue=self.queue_name, + exchange=self.exchange_name, + routing_key=self.binding_key + ) + + # Tell RabbitMQ to call on_message_callback when a message arrives + self.channel.basic_consume( + queue=self.queue_name, + on_message_callback=self.on_message_callback, + auto_ack=False + ) + + def on_message_callback(self, channel, method_frame, header_frame, body) -> None: + # Acknowledge the message (tell RabbitMQ we got it) + channel.basic_ack(delivery_tag=method_frame.delivery_tag) + # Print the message decoded from bytes to string + print(body.decode("utf-8")) + + def startConsuming(self) -> None: + print(" [*] Waiting for messages. To exit press CTRL+C") + self.channel.start_consuming() + + def __del__(self) -> None: + print("Closing RMQ connection on destruction") + try: + self.channel.close() + self.connection.close() + except Exception: + pass diff --git a/tech_lab_on_campus/market_watch/producer_and_consumer/consumer/solution/producer_sol.py b/tech_lab_on_campus/market_watch/producer_and_consumer/consumer/solution/producer_sol.py new file mode 100644 index 00000000..5e04b994 --- /dev/null +++ b/tech_lab_on_campus/market_watch/producer_and_consumer/consumer/solution/producer_sol.py @@ -0,0 +1,49 @@ +import pika +from mqProducerInterface import mqProducerInterface + +class mqProducer(mqProducerInterface): + def __init__(self, host, queue_name): + """ + Constructor: Save the two variables needed to instantiate the class + and initialize the RMQ connection. + """ + self.host = host + self.queue_name = queue_name + + # Initialize connection and channel placeholders + self.connection = None + self.channel = None + + # Call the setup function immediately + self.setupRMQConnection() + + def setupRMQConnection(self): + """ + Establish connection to the RabbitMQ service. + """ + # Create a connection to the broker + self.connection = pika.BlockingConnection( + pika.ConnectionParameters(host=self.host) + ) + self.channel = self.connection.channel() + + # Ensure the queue exists + self.channel.queue_declare(queue=self.queue_name) + + def publishOrder(self, message): + """ + Publish a simple UTF-8 string message from the parameter. + Close Channel and Connection afterwards. + """ + # Ensure the message is sent as a UTF-8 string + self.channel.basic_publish( + exchange='', + routing_key=self.queue_name, + body=message.encode('utf-8') + ) + + print(f" [x] Sent: {message}") + + # Close Channel and Connection + self.channel.close() + self.connection.close() \ No newline at end of file diff --git a/tech_lab_on_campus/market_watch/producer_and_consumer/producer/solution/producer_sol.py b/tech_lab_on_campus/market_watch/producer_and_consumer/producer/solution/producer_sol.py new file mode 100644 index 00000000..bd1dda24 --- /dev/null +++ b/tech_lab_on_campus/market_watch/producer_and_consumer/producer/solution/producer_sol.py @@ -0,0 +1,19 @@ +import pika +from producer_interface import mqProducerInterface + +class mqProducer(mqProducerInterface): + def __init__(self, routing_key, exchange_name): + self.routing_key = routing_key + self.exchange_name = exchange_name + self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq')) + self.channel = self.connection.channel() + self.channel.exchange_declare(exchange=self.exchange_name, exchange_type='direct') + + def publishOrder(self, message): + self.channel.basic_publish( + exchange=self.exchange_name, + routing_key=self.routing_key, + body=message.encode('utf-8') + ) + print(f"Sent: {message}") + self.connection.close() \ No newline at end of file