카테고리 없음

RabbitMQ 서버 연결 예시 코드 250523

mezzanineX 2025. 5. 23. 13:29

q1: RabbitMQ 서버 연결 예시 코드

a1:

아래 코드는 Python에서 pika 라이브러리를 이용해 RabbitMQ 서버에 연결하고, 추천 로그 메시지를 교환(exchange)–큐(queue) 구조로 발행(publish)하는 예제입니다. 하나씩 차근차근 살펴보겠습니다.



1. 라이브러리 임포트

import pika, json

• pika
RabbitMQ에 AMQP 프로토콜로 메시지를 주고받기 위한 대표적인 Python 클라이언트 라이브러리입니다.
• json
Python 객체를 JSON 문자열로 직렬화(encode)하거나, 반대로 JSON 문자열을 Python 객체로 역직렬화(decode)할 때 사용합니다.



2. RabbitMQ 서버 연결 설정

conn = pika.BlockingConnection(
    pika.ConnectionParameters(host='rabbitmq-server')
)
channel = conn.channel()

1. BlockingConnection
가장 기본적인 동기식 연결 방식입니다.
2. ConnectionParameters(host='rabbitmq-server')
연결할 RabbitMQ 서버의 호스트 이름(또는 IP)을 지정합니다.
3. conn.channel()
AMQP의 작업 단위인 “채널(channel)”을 하나 엽니다. 실제 메시지 생성·발행·구독 등은 이 채널을 통해 이뤄집니다.



3. 교환(exchange)·큐(queue) 선언 및 바인딩(binding)

channel.exchange_declare(
    exchange='recommendation.exchange',
    exchange_type='direct'
)

교환(exchange)
프로듀서가 보낸 메시지를 하나 이상의 큐로 라우팅(routing)하는 역할을 합니다.
exchange_type='direct'
메시지의 routing_key 값을 기반으로 정확히 일치하는 큐에만 전달합니다.

channel.queue_declare(
    queue='recommendation.log.queue',
    durable=True
)

큐(queue)
메시지를 저장하는 버퍼입니다.
durable=True
RabbitMQ 서버 재시작 시에도 해당 큐가 유지되도록 “지속성 있는 큐”로 선언합니다.

channel.queue_bind(
    exchange='recommendation.exchange',
    queue='recommendation.log.queue',
    routing_key='recommendation.log'
)

바인딩(binding)
특정 교환과 큐를 연결하고, 어떤 routing_key를 사용할지 지정합니다.
이 설정에 따라, 교환에 routing_key='recommendation.log' 로 발행된 메시지만 recommendation.log.queue에 쌓입니다.



4. 로그 전송 함수 정의

def send_log(user_id, recs):
    message = json.dumps({ "userId": user_id, "recs": recs })
    channel.basic_publish(
        exchange='recommendation.exchange',
        routing_key='recommendation.log',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # 메시지 영속성
        )
    )

1. 파라미터
• user_id : 로그를 남길 사용자 식별자 (예: 숫자 또는 문자열)
• recs : 추천 항목 리스트 (예: 추천 상품 ID 목록 등)
2. json.dumps({...})
• Python 딕셔너리를 JSON 문자열로 변환합니다.
• 결과 예시: {"userId": 123, "recs": ["itemA", "itemB", "itemC"]}
3. channel.basic_publish(...)
• exchange='recommendation.exchange'
앞서 선언한 교환으로 메시지를 발행합니다.
• routing_key='recommendation.log'
해당 키와 일치하는 큐로 라우팅됩니다.
• body=message
전송할 메시지 내용(바디)으로 JSON 문자열을 지정합니다.
• properties=pika.BasicProperties(delivery_mode=2)
delivery_mode=2 는 메시지를 디스크에 기록하여 RabbitMQ 재시작 시에도 보존하도록 하는 “영속성 있는 메시지” 설정입니다.



5. 요약
1. 연결을 맺고 → 2. 교환과 큐를 선언한 뒤 → 3. 둘을 바인딩
2. send_log() 함수를 호출하면
{ userId, recs } 형태의 JSON 메시지를
recommendation.exchange 교환으로 발행하고
recommendation.log.queue 큐에 쌓이도록 합니다.

이 구조를 통해 추천 시스템에서 생성된 사용자별 추천 결과를 로그 큐로 안정적으로 전달·저장할 수 있습니다.

반응형