NATS – 메시징 시스템

Go언어로 개발된 NATS는 클라우드 네이티브 애플리케이션, IoT 메시징, 마이크로서비스 아키텍처 등을 위한 고성능, 경량의 오픈소스 메시징 시스템입니다. 단순하고 확장성이 뛰어난 Pub/Sub 모델을 기반으로 하며, 이외에 다양한 메시지 패턴과 기능을 제공합니다.
여러 메시지 큐들의 처리량을 비교하는 아래 그래프를 보면 NATS에 성능이 뛰어남을 알수 있습니다. NATS는 발신 및 수신 처리량 모두 약 180,000 이상으로 매우 높은 처리량을 보여줍니다. Kafka는 발신 처리량은 약 90,000, 수신 처리량은 약 60,000으로, NATS 다음으로 높은 처리량을 보이지만 1/2배 정도 낮은 속도를 보입니다.
* 기본 메시지 패턴
  > Publish-Subscribe: 한 클라이언트가 특정 주제(subject)로 메시지를 발행하면, 그 주제를 구독하는 모든 클라이언트에게 메시지가 전달.
  > Request-Reply: 요청-응답 패턴으로, 클라이언트가 요청을 보내고 응답을 기다림.
  > Queue Groups: 작업자(워커) 그룹 사이에서 메시지를 로드 밸런싱함.

1. NATS 서버 실행

NATS 서버 바이너리 다운로드: 최신 버전의 NATS 서버 바이너리를 GitHub에서 다운로드합니다.
				
					wget https://github.com/nats-io/nats-server/releases/download/v2.10.2/nats-server-v2.10.2-linux-amd64.tar.gz

				
			
압축 해제: 다운로드한 파일을 압축 해제합니다.
				
					tar -xzf nats-server-v2.10.2-linux-amd64.tar.gz

				
			
서버 실행: NATS 서버를 기본 설정으로 실행.
				
					nats-server-v2.10.2-linux-amd64/nats-server
				
			

2. Python 테스트

모든 예제는 Python의 nats-py 라이브러리를 사용합니다. nats-py 라이브러리는 다음 명령어로 설치할 수 있습니다.
				
					pip install nats-py

				
			

3. Publish-Subscribe

발행자 : 특정 주제로 메시지를 발행.
구독자 : 특정 주제를 구독해서 메시지를 수신.
발행자 (publisher.py)
				
					import asyncio
from nats.aio.client import Client as NATS

async def main():
    nc = NATS()
    await nc.connect("nats://127.0.0.1:4222")

    # Message to publish
    message = "Hello, this is a NATS message."

    # Publish the message
    await nc.publish("greetings", message.encode('utf-8'))
    print("Published a message.")

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())
				
			
구독자 (subscriber.py)
				
					import asyncio
from nats.aio.client import Client as NATS

async def message_handler(msg):
    data = msg.data.decode('utf-8')
    print(f"Received message: {data}")

async def main():
    nc = NATS()
    await nc.connect("nats://127.0.0.1:4222")

    # Subscribe to the subject
    await nc.subscribe("greetings", cb=message_handler)
    print("Subscribed to subject 'greetings'.")

    # Keep the subscriber running
    while True:
        await asyncio.sleep(1)

if __name__ == '__main__':
    asyncio.run(main())

				
			
테스트 실행
				
					# Run the Subscriber
python subscriber.py

# In another terminal, Run the Publisher
python publisher.py
				
			

4. Request-Reply

요청자 : 특정 주제로 요청 메시지를 송신.
응답자 : 특정 주제를 구독하고 요청에 응답.
요청자 (requester.py)
				
					import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrTimeout

async def main():
    nc = NATS()
    await nc.connect("nats://127.0.0.1:4222")

    # Request message
    message = "What is the current time?"

    try:
        # Send a request
        response = await nc.request("time.request", message.encode('utf-8'), timeout=2)
        print(f"Received response: {response.data.decode('utf-8')}")
    except ErrTimeout:
        print("Request timed out.")

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())
				
			
응답자 (replier.py)
				
					import asyncio
from datetime import datetime
from nats.aio.client import Client as NATS

async def main():
    nc = NATS()

    async def request_handler(msg):
        # Print the request message
        request = msg.data.decode('utf-8')
        print(f"Received request: {request}")

        # Generate current time
        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        response = f"The current time is {current_time}."

        # Send the response
        print(f"response subject: {msg.reply}")
        await nc.publish(msg.reply, response.encode('utf-8'))
        print("Sent a response.")

    await nc.connect("nats://127.0.0.1:4222")

    # Subscribe to the subject
    await nc.subscribe("time.request", cb=request_handler)
    print("Subscribed to subject 'time.request' and ready to handle requests.")

    # Keep the replier running
    while True:
        await asyncio.sleep(1)

if __name__ == '__main__':
    asyncio.run(main())
				
			
테스트 실행
				
					# Run the Subscriber
python replier.py

# In another terminal, Run the Publisher
python requester.py
				
			

5. Queue Groups (로드 밸런싱)

여러 작업자(워커)가 같은 큐 그룹에 가입하여 작업을 분산 처리.
메시지는 큐 그룹 내의 한 작업자에게만 전달됨.
작업 발행자 (task_publisher.py)
				
					import asyncio
from nats.aio.client import Client as NATS

async def main():
    nc = NATS()
    await nc.connect("nats://127.0.0.1:4222")

    # Publish tasks
    for i in range(1, 11):
        task = f"Task number {i}"
        await nc.publish("tasks", task.encode('utf-8'))
        print(f"Published task: {task}")
        await asyncio.sleep(0.5)

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())
				
			
작업자 (worker.py)
				
					import asyncio
import sys
from nats.aio.client import Client as NATS

async def main(worker_id):
    nc = NATS()

    async def task_handler(msg):
        task = msg.data.decode('utf-8')
        print(f"Worker {worker_id} processing {task}...")
        # Simulate task processing
        await asyncio.sleep(1)
        print(f"Worker {worker_id} completed {task}")

    await nc.connect("nats://127.0.0.1:4222")

    # Subscribe to the subject within a queue group
    await nc.subscribe("tasks", queue="workers", cb=task_handler)
    print(f"Worker {worker_id} has started.")

    # Keep the worker running
    while True:
        await asyncio.sleep(1)

if __name__ == '__main__':
    worker_id = sys.argv[1] if len(sys.argv) > 1 else '1'
    asyncio.run(main(worker_id))
				
			
테스트 실행
				
					# Start multiple workers in separate terminals
python worker.py 1
python worker.py 2
python worker.py 3

# In another terminal, Run the Task Publisher
python task_publisher.py
				
			

6. 메시지 헤더

메시지에 헤더를 추가하여 메타데이터를 전달할 수 있음.
헤더와 함께 메시지 발행 (header_publisher.py)
				
					import asyncio
from nats.aio.client import Client as NATS

async def main():
    nc = NATS()
    await nc.connect("nats://127.0.0.1:4222")

    # Set headers
    headers = {
        "Content-Type": "application/json",
        "X-Request-ID": "12345"
    }

    # Message to publish
    message = '{"message": "This message includes headers."}'

    # Publish the message with headers
    await nc.publish("updates", message.encode('utf-8'), headers=headers)
    print("Published a message with headers.")

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())
				
			
헤더를 읽는 구독자 (header_subscriber.py)
				
					import asyncio
from nats.aio.client import Client as NATS

async def main():
    nc = NATS()

    async def message_handler(msg):
        data = msg.data.decode('utf-8')
        headers = msg.headers

        print(f"Received message: {data}")
        if headers:
            for key, value in headers.items():
                print(f"Header {key}: {value}")
        else:
            print("No headers received.")

    await nc.connect("nats://127.0.0.1:4222")

    # Subscribe to the subject
    await nc.subscribe("updates", cb=message_handler)
    print("Subscribed to subject 'updates'.")

    # Keep the subscriber running
    while True:
        await asyncio.sleep(1)

if __name__ == '__main__':
    asyncio.run(main())
				
			
테스트 실행
				
					# Run the subscriber
python header_subscriber.py

# In another terminal, Run the publisher
python task_publisher.py
				
			

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다