From de6b4af798a7fdf23e2819c3ea88ccf795d2f326 Mon Sep 17 00:00:00 2001 From: WebAxol Date: Tue, 17 Feb 2026 17:28:10 -0600 Subject: [PATCH 1/2] change(.gitignore): Now ignoring '.venv' to avoid pushing the virtual environment; docker should be used at runtime. --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 6a10e732..c2733434 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ node_modules/ # NextJS next-env.d.ts .next/ +.venv \ No newline at end of file From 46b801cff4a151eab0788bf3fbf9d3335fe2a446 Mon Sep 17 00:00:00 2001 From: WebAxol Date: Tue, 17 Feb 2026 17:31:45 -0600 Subject: [PATCH 2/2] feat(rabbit_mq_consumer): Implemented class 'RabbitMQConsumer', which implements 'mqConsumerInterface'. The module is used by 'consume.py', which provides the required context to connect RabbitMQ. --- .../producer_and_consumer/consumer/consume.py | 5 +- .../consumer/solution/rabbit_mq_consumer.py | 89 +++++++++++++++++++ 2 files changed, 91 insertions(+), 3 deletions(-) create mode 100755 tech_lab_on_campus/market_watch/producer_and_consumer/consumer/solution/rabbit_mq_consumer.py diff --git a/tech_lab_on_campus/market_watch/producer_and_consumer/consumer/consume.py b/tech_lab_on_campus/market_watch/producer_and_consumer/consumer/consume.py index 01570ab3..255494ab 100755 --- a/tech_lab_on_campus/market_watch/producer_and_consumer/consumer/consume.py +++ b/tech_lab_on_campus/market_watch/producer_and_consumer/consumer/consume.py @@ -17,11 +17,10 @@ import os import sys -from solution.consumer_sol import mqConsumer # pylint: disable=import-error - +from solution.rabbit_mq_consumer import RabbitMQConsumer def main() -> None: - consumer = mqConsumer(binding_key="Tech Lab Key",exchange_name="Tech Lab Exchange",queue_name="Tech Lab Queue") + consumer = RabbitMQConsumer(binding_key="Tech Lab Key",exchange_name="Tech Lab Exchange",queue_name="Tech Lab Queue") consumer.startConsuming() diff --git a/tech_lab_on_campus/market_watch/producer_and_consumer/consumer/solution/rabbit_mq_consumer.py b/tech_lab_on_campus/market_watch/producer_and_consumer/consumer/solution/rabbit_mq_consumer.py new file mode 100755 index 00000000..26d3bfa6 --- /dev/null +++ b/tech_lab_on_campus/market_watch/producer_and_consumer/consumer/solution/rabbit_mq_consumer.py @@ -0,0 +1,89 @@ +# Copyright 2024 Bloomberg Finance L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pika +import os + +from consumer_interface import mqConsumerInterface + +class RabbitMQConsumer(mqConsumerInterface): + def __init__( + self, binding_key: str, exchange_name: str, queue_name: str + ) -> None: + # Save parameters to class variables + + self.binding_key = binding_key + self.exchange_name = exchange_name + self.queue_name = queue_name + self.channel = None + self.connection = None + + # Call setupRMQConnection + self.setupRMQConnection() + + pass + + def setupRMQConnection(self) -> None: + + # Set-up Connection to RabbitMQ service + con_params = pika.URLParameters(os.environ["AMQP_URL"]) + self.connection = pika.BlockingConnection(con_params) + + # Establish Channel + self.channel = self.connection.channel() + + # Create Queue if not already present + self.channel.queue_declare(queue=self.queue_name) + + # Create the exchange if not already present + self.channel.exchange_declare( + exchange=self.exchange_name, + exchange_type="direct", + durable=True + ) + + # Bind Binding Key to Queue on the exchange + self.channel.queue_bind(queue=self.queue_name,exchange=self.exchange_name,routing_key=self.binding_key) + + # Set-up Callback function for receiving messages + self.channel.basic_consume(queue=self.queue_name, on_message_callback=self.on_message_callback) + + def on_message_callback( + self, channel, method_frame, header_frame, body + ) -> None: + # Acknowledge message + channel.basic_ack(method_frame.delivery_tag) + + #Print message (The message is contained in the body parameter variable) + print(f"Received: {body}") + + def startConsuming(self): + print("Started message consumption: waiting for message...") + + self.channel.start_consuming() + + def __del__(self) -> None: + + # Print "Closing RMQ connection on destruction" + print("Closing RMQ connection on destruction") + + # Close Channel + if self.channel: + self.channel.close() + self.channel = None + + # Close Connection + if self.connection: + self.connection.close() + self.connection = None