Go언어로 개발된 NATS는 클라우드 네이티브 애플리케이션, IoT 메시징, 마이크로서비스 아키텍처 등을 위한 고성능, 경량의 오픈소스 메시징 시스템입니다. 단순하고 확장성이 뛰어난 Pub/Sub 모델을 기반으로 하며, 이외에 다양한 메시지 패턴과 기능을 제공합니다.
여러 메시지 큐들의 처리량을 비교하는 아래 그래프를 보면 NATS에 성능이 뛰어남을 알수 있습니다. NATS는 발신 및 수신 처리량 모두 약 180,000 이상으로 매우 높은 처리량을 보여줍니다. Kafka는 발신 처리량은 약 90,000, 수신 처리량은 약 60,000으로, NATS 다음으로 높은 처리량을 보이지만 1/2배 정도 낮은 속도를 보입니다.

* 기본 메시지 패턴
> Publish-Subscribe: 한 클라이언트가 특정 주제(subject)로 메시지를 발행하면, 그 주제를 구독하는 모든 클라이언트에게 메시지가 전달.
> Request-Reply: 요청-응답 패턴으로, 클라이언트가 요청을 보내고 응답을 기다림.
> Queue Groups: 작업자(워커) 그룹 사이에서 메시지를 로드 밸런싱함.
> Publish-Subscribe: 한 클라이언트가 특정 주제(subject)로 메시지를 발행하면, 그 주제를 구독하는 모든 클라이언트에게 메시지가 전달.
> Request-Reply: 요청-응답 패턴으로, 클라이언트가 요청을 보내고 응답을 기다림.
> Queue Groups: 작업자(워커) 그룹 사이에서 메시지를 로드 밸런싱함.
1. NATS 서버 실행
NATS 서버 바이너리 다운로드: 최신 버전의 NATS 서버 바이너리를 GitHub에서 다운로드합니다.
wget https://github.com/nats-io/nats-server/releases/download/v2.10.2/nats-server-v2.10.2-linux-amd64.tar.gz
압축 해제: 다운로드한 파일을 압축 해제합니다.
tar -xzf nats-server-v2.10.2-linux-amd64.tar.gz
서버 실행: NATS 서버를 기본 설정으로 실행.
nats-server-v2.10.2-linux-amd64/nats-server
2. Python 테스트
모든 예제는 Python의 nats-py 라이브러리를 사용합니다. nats-py 라이브러리는 다음 명령어로 설치할 수 있습니다.
pip install nats-py
3. Publish-Subscribe
발행자 : 특정 주제로 메시지를 발행.
구독자 : 특정 주제를 구독해서 메시지를 수신.

발행자 (publisher.py)
import asyncio
from nats.aio.client import Client as NATS
async def main():
nc = NATS()
await nc.connect("nats://127.0.0.1:4222")
# Message to publish
message = "Hello, this is a NATS message."
# Publish the message
await nc.publish("greetings", message.encode('utf-8'))
print("Published a message.")
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
구독자 (subscriber.py)
import asyncio
from nats.aio.client import Client as NATS
async def message_handler(msg):
data = msg.data.decode('utf-8')
print(f"Received message: {data}")
async def main():
nc = NATS()
await nc.connect("nats://127.0.0.1:4222")
# Subscribe to the subject
await nc.subscribe("greetings", cb=message_handler)
print("Subscribed to subject 'greetings'.")
# Keep the subscriber running
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
테스트 실행
# Run the Subscriber
python subscriber.py
# In another terminal, Run the Publisher
python publisher.py
4. Request-Reply
요청자 : 특정 주제로 요청 메시지를 송신.
응답자 : 특정 주제를 구독하고 요청에 응답.

요청자 (requester.py)
import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrTimeout
async def main():
nc = NATS()
await nc.connect("nats://127.0.0.1:4222")
# Request message
message = "What is the current time?"
try:
# Send a request
response = await nc.request("time.request", message.encode('utf-8'), timeout=2)
print(f"Received response: {response.data.decode('utf-8')}")
except ErrTimeout:
print("Request timed out.")
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
응답자 (replier.py)
import asyncio
from datetime import datetime
from nats.aio.client import Client as NATS
async def main():
nc = NATS()
async def request_handler(msg):
# Print the request message
request = msg.data.decode('utf-8')
print(f"Received request: {request}")
# Generate current time
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
response = f"The current time is {current_time}."
# Send the response
print(f"response subject: {msg.reply}")
await nc.publish(msg.reply, response.encode('utf-8'))
print("Sent a response.")
await nc.connect("nats://127.0.0.1:4222")
# Subscribe to the subject
await nc.subscribe("time.request", cb=request_handler)
print("Subscribed to subject 'time.request' and ready to handle requests.")
# Keep the replier running
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
테스트 실행
# Run the Subscriber
python replier.py
# In another terminal, Run the Publisher
python requester.py
5. Queue Groups (로드 밸런싱)
여러 작업자(워커)가 같은 큐 그룹에 가입하여 작업을 분산 처리.
메시지는 큐 그룹 내의 한 작업자에게만 전달됨.

작업 발행자 (task_publisher.py)
import asyncio
from nats.aio.client import Client as NATS
async def main():
nc = NATS()
await nc.connect("nats://127.0.0.1:4222")
# Publish tasks
for i in range(1, 11):
task = f"Task number {i}"
await nc.publish("tasks", task.encode('utf-8'))
print(f"Published task: {task}")
await asyncio.sleep(0.5)
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
작업자 (worker.py)
import asyncio
import sys
from nats.aio.client import Client as NATS
async def main(worker_id):
nc = NATS()
async def task_handler(msg):
task = msg.data.decode('utf-8')
print(f"Worker {worker_id} processing {task}...")
# Simulate task processing
await asyncio.sleep(1)
print(f"Worker {worker_id} completed {task}")
await nc.connect("nats://127.0.0.1:4222")
# Subscribe to the subject within a queue group
await nc.subscribe("tasks", queue="workers", cb=task_handler)
print(f"Worker {worker_id} has started.")
# Keep the worker running
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
worker_id = sys.argv[1] if len(sys.argv) > 1 else '1'
asyncio.run(main(worker_id))
테스트 실행
# Start multiple workers in separate terminals
python worker.py 1
python worker.py 2
python worker.py 3
# In another terminal, Run the Task Publisher
python task_publisher.py
6. 메시지 헤더
메시지에 헤더를 추가하여 메타데이터를 전달할 수 있음.
헤더와 함께 메시지 발행 (header_publisher.py)
import asyncio
from nats.aio.client import Client as NATS
async def main():
nc = NATS()
await nc.connect("nats://127.0.0.1:4222")
# Set headers
headers = {
"Content-Type": "application/json",
"X-Request-ID": "12345"
}
# Message to publish
message = '{"message": "This message includes headers."}'
# Publish the message with headers
await nc.publish("updates", message.encode('utf-8'), headers=headers)
print("Published a message with headers.")
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
헤더를 읽는 구독자 (header_subscriber.py)
import asyncio
from nats.aio.client import Client as NATS
async def main():
nc = NATS()
async def message_handler(msg):
data = msg.data.decode('utf-8')
headers = msg.headers
print(f"Received message: {data}")
if headers:
for key, value in headers.items():
print(f"Header {key}: {value}")
else:
print("No headers received.")
await nc.connect("nats://127.0.0.1:4222")
# Subscribe to the subject
await nc.subscribe("updates", cb=message_handler)
print("Subscribed to subject 'updates'.")
# Keep the subscriber running
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
테스트 실행
# Run the subscriber
python header_subscriber.py
# In another terminal, Run the publisher
python task_publisher.py