Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ node_modules/
# NextJS
next-env.d.ts
.next/
.venv
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
import os
import sys

from solution.consumer_sol import mqConsumer # pylint: disable=import-error

from solution.rabbit_mq_consumer import RabbitMQConsumer

def main() -> None:
consumer = mqConsumer(binding_key="Tech Lab Key",exchange_name="Tech Lab Exchange",queue_name="Tech Lab Queue")
consumer = RabbitMQConsumer(binding_key="Tech Lab Key",exchange_name="Tech Lab Exchange",queue_name="Tech Lab Queue")
consumer.startConsuming()


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2024 Bloomberg Finance L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pika
import os

from consumer_interface import mqConsumerInterface

class RabbitMQConsumer(mqConsumerInterface):
def __init__(
self, binding_key: str, exchange_name: str, queue_name: str
) -> None:
# Save parameters to class variables

self.binding_key = binding_key
self.exchange_name = exchange_name
self.queue_name = queue_name
self.channel = None
self.connection = None

# Call setupRMQConnection
self.setupRMQConnection()

pass

def setupRMQConnection(self) -> None:

# Set-up Connection to RabbitMQ service
con_params = pika.URLParameters(os.environ["AMQP_URL"])
self.connection = pika.BlockingConnection(con_params)

# Establish Channel
self.channel = self.connection.channel()

# Create Queue if not already present
self.channel.queue_declare(queue=self.queue_name)

# Create the exchange if not already present
self.channel.exchange_declare(
exchange=self.exchange_name,
exchange_type="direct",
durable=True
)

# Bind Binding Key to Queue on the exchange
self.channel.queue_bind(queue=self.queue_name,exchange=self.exchange_name,routing_key=self.binding_key)

# Set-up Callback function for receiving messages
self.channel.basic_consume(queue=self.queue_name, on_message_callback=self.on_message_callback)

def on_message_callback(
self, channel, method_frame, header_frame, body
) -> None:
# Acknowledge message
channel.basic_ack(method_frame.delivery_tag)

#Print message (The message is contained in the body parameter variable)
print(f"Received: {body}")

def startConsuming(self):
print("Started message consumption: waiting for message...")

self.channel.start_consuming()

def __del__(self) -> None:

# Print "Closing RMQ connection on destruction"
print("Closing RMQ connection on destruction")

# Close Channel
if self.channel:
self.channel.close()
self.channel = None

# Close Connection
if self.connection:
self.connection.close()
self.connection = None